Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/e2e-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ jobs:
- name: E2E - classical gang
run: bash test/e2e/01-classical-gang.sh

- name: E2E - multi-pod gang (all-or-nothing + contention)
run: bash test/e2e/06-multi-gang.sh

- name: E2E - PostFilter re-match (taint-rejected allocation)
run: bash test/e2e/05-postfilter-rematch.sh

- name: Deploy quantum add-on
run: |
# Includes the device plugin and oriented to testing container
Expand Down
27 changes: 27 additions & 0 deletions examples/multi-gang-contention.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Multi-pod gang via the WEBHOOK path (the path the experiments use): pods carry
# the group LABEL + group-size annotation; the fluence webhook creates the
# PodGroup with minCount = group-size (3). All 3 must place or none.
apiVersion: apps/v1
kind: Deployment
metadata:
name: gang3
spec:
replicas: 3
selector:
matchLabels: {app: gang3}
template:
metadata:
labels:
app: gang3
fluence.flux-framework.org/group: gang3
annotations:
fluence.flux-framework.org/group-size: "3"
spec:
schedulerName: fluence
containers:
- name: worker
image: busybox
command: ["sleep", "3600"]
resources:
requests:
cpu: "1"
98 changes: 90 additions & 8 deletions pkg/fluence/fluence.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,21 @@ type Fluence struct {
mu sync.Mutex
// placement maps a group key to its allocation (nodes, backend, jobids).
placement map[string]groupAlloc
// excludedNodes maps a group key to the set of node names that have been
// rejected for that group by other scheduler plugins (taints, affinity,
// volume topology that Fluxion's graph does not model). PostFilter adds the
// whole failed allocation's nodes here; PreFilter feeds them back as an RFC 31
// negated-hostlist constraint so the re-match is forced onto untried nodes.
// The set only grows for a group, guaranteeing the retry converges (finite
// node pool) and is cleared on teardown. Guarded by mu.
excludedNodes map[string]map[string]bool
}

var (
_ fwk.PreFilterPlugin = (*Fluence)(nil)
_ fwk.FilterPlugin = (*Fluence)(nil)
_ fwk.PreBindPlugin = (*Fluence)(nil)
_ fwk.PreFilterPlugin = (*Fluence)(nil)
_ fwk.FilterPlugin = (*Fluence)(nil)
_ fwk.PostFilterPlugin = (*Fluence)(nil)
_ fwk.PreBindPlugin = (*Fluence)(nil)
)

// New builds the plugin: discover cluster nodes, optionally inject quantum
Expand Down Expand Up @@ -161,10 +170,11 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error
fluxion.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "")

f := &Fluence{
handle: h,
matcher: fluxion,
knownDevices: knownDevices,
placement: map[string]groupAlloc{},
handle: h,
matcher: fluxion,
knownDevices: knownDevices,
placement: map[string]groupAlloc{},
excludedNodes: map[string]map[string]bool{},
}
f.registerCancelHandlers()
// Periodic + startup reconcile of completed Fluence-created PodGroups, so a
Expand Down Expand Up @@ -251,7 +261,15 @@ func (f *Fluence) PreFilter(
return nil, fwk.AsStatus(err)
}

specs, err := placement.JobspecsForGroup(group, pods, f.knownDevices)
f.mu.Lock()
excluded := make([]string, 0, len(f.excludedNodes[group]))
for n := range f.excludedNodes[group] {
excluded = append(excluded, n)
}
f.mu.Unlock()
sort.Strings(excluded) // deterministic constraint for stable matching/logs

specs, err := placement.JobspecsForGroup(group, pods, f.knownDevices, excluded)
if err != nil {
return nil, fwk.AsStatus(err)
}
Expand Down Expand Up @@ -390,6 +408,69 @@ func (f *Fluence) Filter(
return fwk.NewStatus(fwk.Unschedulable, "node not in fluxion allocation for this group")
}

// PostFilter runs when a pod could not be scheduled after Filter — for a Fluence
// group, this means the cached Fluxion allocation's nodes did not all survive
// the other scheduler plugins' Filter checks (a taint, node affinity, or volume
// topology constraint that Fluxion's resource graph does not model rejected one
// or more of them). Without intervention the group would retry forever against
// the same cached allocation while the Fluxion reservation leaked, because
// PreFilter short-circuits on the cache and nothing else releases it on a
// scheduling failure.
//
// We react by abandoning the failed allocation: the ENTIRE cached node set is
// added to the group's exclusion set, the Fluxion jobids are cancelled, and the
// cached placement is deleted. The next PreFilter for the group re-matches with
// an RFC 31 negated-hostlist constraint over the accumulated exclusion set, so
// Fluxion is forced onto untried nodes. We exclude the whole set (not just the
// individually-rejected nodes) deliberately: if the group as a whole could not
// be admitted, a node that happened to survive this round carries no guarantee
// for the next, and excluding the whole set makes each retry a strictly smaller,
// monotonic search that converges — either to a feasible allocation on untried
// nodes, or to a clean no-match (Unschedulable) once the graph is exhausted, at
// which point the pod waits for a cluster-state change rather than busy-looping.
func (f *Fluence) PostFilter(
ctx context.Context,
state fwk.CycleState,
pod *corev1.Pod,
filteredNodeStatusMap fwk.NodeToStatusReader,
) (*fwk.PostFilterResult, *fwk.Status) {
group := groupKey(pod)

f.mu.Lock()
alloc, ok := f.placement[group]
if !ok {
// No cached allocation for this group — nothing of ours to reconcile.
// (Another plugin's PostFilter, or a non-group pod.)
f.mu.Unlock()
return nil, fwk.NewStatus(fwk.Unschedulable)
}
// Accumulate the whole failed allocation's nodes into the exclusion set.
if f.excludedNodes[group] == nil {
f.excludedNodes[group] = map[string]bool{}
}
for _, n := range alloc.place.Nodes {
f.excludedNodes[group][n] = true
}
excludedCount := len(f.excludedNodes[group])
jobids := alloc.jobids
delete(f.placement, group)
f.mu.Unlock()

// Release the Fluxion reservation for the abandoned allocation so the graph
// does not leak it while the group retries.
f.cancelJobids(jobids)

log.Printf("[fluence] group %s unschedulable: abandoning allocation (nodes %v, "+
"jobids %v); %d node(s) now excluded, will re-match on next cycle",
group, alloc.place.Nodes, jobids, excludedCount)

// Returning Unschedulable (no nominated node) lets the pod be requeued; the
// next PreFilter re-matches with the enlarged exclusion set. We do not
// nominate a node — Fluxion, not PostFilter preemption, chooses the next
// placement.
return nil, fwk.NewStatus(fwk.Unschedulable)
}

// PreBindPreFlight runs before PreBind. It returns Success when we have a cached
// allocation for the pod's group (so PreBind can record the jobid, and stamp the
// backend for a quantum pod), and Skip otherwise.
Expand Down Expand Up @@ -718,6 +799,7 @@ func (f *Fluence) cancelGroup(key string, ann map[string]string) {

f.mu.Lock()
delete(f.placement, key)
delete(f.excludedNodes, key) // drop accumulated exclusions so a future group reusing the name starts clean
f.mu.Unlock()
}

Expand Down
108 changes: 107 additions & 1 deletion pkg/fluence/fluence_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fluence

import (
"context"
"errors"
"testing"

Expand All @@ -12,6 +13,7 @@ import (
schedv1a2 "k8s.io/api/scheduling/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
fwk "k8s.io/kube-scheduler/framework"
)

// fakeMatcher records Cancel calls so cancel behavior can be asserted without
Expand Down Expand Up @@ -46,7 +48,11 @@ func (m *fakeMatcher) Cancel(jobid uint64) error {
}

func newTestFluence(m matcher) *Fluence {
return &Fluence{matcher: m, placement: map[string]groupAlloc{}}
return &Fluence{
matcher: m,
placement: map[string]groupAlloc{},
excludedNodes: map[string]map[string]bool{},
}
}

func ann(jobid string) map[string]string {
Expand Down Expand Up @@ -345,3 +351,103 @@ func twoSpecs() []*jobspec.Jobspec {
{Version: 9999},
}
}

// --- PostFilter allocation reconciliation -----------------------------------

// PostFilter must abandon a group's failed allocation: add the WHOLE cached node
// set to the exclusion set, cancel the Fluxion jobids, and delete the cache, so
// the next PreFilter re-matches onto untried nodes.
func TestPostFilterAbandonsAndExcludesWholeNodeSet(t *testing.T) {
m := &fakeMatcher{}
f := newTestFluence(m)
key := "default/training"
f.placement[key] = groupAlloc{
place: placement.Placement{Nodes: []string{"node-a", "node-b", "node-c"}},
jobids: []uint64{11, 12},
}
pod := groupedPod("default", "training-0", "training", nil)

_, status := f.PostFilter(context.Background(), nil, pod, nil)
if status == nil || status.Code() != fwk.Unschedulable {
t.Fatalf("expected Unschedulable status, got %v", status)
}
// cache cleared
if _, still := f.placement[key]; still {
t.Fatal("placement cache should be deleted after PostFilter")
}
// jobids cancelled
if len(m.cancelled) != 2 {
t.Fatalf("expected both jobids cancelled, got %v", m.cancelled)
}
// the WHOLE node set excluded
excl := f.excludedNodes[key]
for _, n := range []string{"node-a", "node-b", "node-c"} {
if !excl[n] {
t.Fatalf("expected %s excluded, set=%v", n, excl)
}
}
if len(excl) != 3 {
t.Fatalf("expected exactly 3 excluded nodes, got %v", excl)
}
}

// Repeated failures accumulate monotonically: a second abandoned allocation adds
// its nodes to the existing exclusion set (the set only grows -> convergence).
func TestPostFilterAccumulatesAcrossAttempts(t *testing.T) {
m := &fakeMatcher{}
f := newTestFluence(m)
key := "default/training"
pod := groupedPod("default", "training-0", "training", nil)

// attempt 1 fails on {a,b}
f.placement[key] = groupAlloc{place: placement.Placement{Nodes: []string{"node-a", "node-b"}}, jobids: []uint64{1}}
f.PostFilter(context.Background(), nil, pod, nil)
// attempt 2 (re-matched elsewhere) fails on {c,d}
f.placement[key] = groupAlloc{place: placement.Placement{Nodes: []string{"node-c", "node-d"}}, jobids: []uint64{2}}
f.PostFilter(context.Background(), nil, pod, nil)

excl := f.excludedNodes[key]
for _, n := range []string{"node-a", "node-b", "node-c", "node-d"} {
if !excl[n] {
t.Fatalf("expected %s in accumulated exclusion set, got %v", n, excl)
}
}
if len(excl) != 4 {
t.Fatalf("exclusion set should accumulate to 4, got %v", excl)
}
}

// PostFilter on a group with no cached allocation (not ours, or already cleared)
// is a safe no-op: no panic, no cancel, returns Unschedulable.
func TestPostFilterUnknownGroupNoop(t *testing.T) {
m := &fakeMatcher{}
f := newTestFluence(m)
pod := groupedPod("default", "stranger-0", "stranger", nil)

_, status := f.PostFilter(context.Background(), nil, pod, nil)
if status == nil || status.Code() != fwk.Unschedulable {
t.Fatalf("expected Unschedulable, got %v", status)
}
if len(m.cancelled) != 0 {
t.Fatalf("nothing should be cancelled for an unknown group, got %v", m.cancelled)
}
if len(f.excludedNodes) != 0 {
t.Fatalf("no exclusion set should be created for an unknown group, got %v", f.excludedNodes)
}
}

// Teardown (cancelGroup) must clear the exclusion set so a future group reusing
// the same key does not inherit stale exclusions.
func TestCancelGroupClearsExclusions(t *testing.T) {
m := &fakeMatcher{}
f := newTestFluence(m)
key := "default/training"
f.placement[key] = groupAlloc{jobids: []uint64{9}}
f.excludedNodes[key] = map[string]bool{"node-a": true}

f.cancelGroup(key, ann("9"))

if _, still := f.excludedNodes[key]; still {
t.Fatal("exclusion set should be cleared on teardown")
}
}
41 changes: 32 additions & 9 deletions pkg/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,44 @@ func withEntries(counts map[string]int) []jobspec.Resource {
// allocation (duration 0 runs to graph end) plus an RFC 31 property constraint
// selecting the eligible node set. properties is the AND-set of composed
// key=value property strings a matched node must carry.
func systemAttributes(properties []string) map[string]interface{} {
func systemAttributes(properties []string, excludeNodes []string) map[string]interface{} {
// Base property constraint (the eligible-node property AND-set).
constraints := map[string]interface{}{
"properties": properties,
}
// When a group has had a placement rejected by other scheduler plugins
// (taints, affinity, volume topology that Fluxion's graph does not model),
// PostFilter accumulates the rejected hostnames and we AND in an RFC 31
// negated hostlist so the re-match is forced onto untried nodes. RFC 31 is
// JsonLogic-style ({operator:[values]}, one operator per object), so to AND
// two operators we nest them under an explicit `and`. We only do this when
// there is something to exclude, so the no-exclusion jobspec is byte-for-byte
// what it was before (and existing tests/behavior are unchanged).
if len(excludeNodes) > 0 {
constraints = map[string]interface{}{
"and": []interface{}{
map[string]interface{}{"properties": properties},
map[string]interface{}{
"not": []interface{}{
map[string]interface{}{"hostlist": excludeNodes},
},
},
},
}
}
return map[string]interface{}{
"system": map[string]interface{}{
// duration 0 => hold the allocation until we explicitly Cancel.
"duration": 0,
"constraints": map[string]interface{}{
"properties": properties,
},
"duration": 0,
"constraints": constraints,
},
}
}

// computeJobspec builds the physical-compute jobspec for a group: one slot per
// pod holding the compute resources, constrained to virtual=false nodes. This is
// the only jobspec for a group that requests no virtual devices.
func computeJobspec(groupName string, slots int, compute map[string]int) *jobspec.Jobspec {
func computeJobspec(groupName string, slots int, compute map[string]int, excludeNodes []string) *jobspec.Jobspec {
return &jobspec.Jobspec{
Version: 9999,
Resources: []jobspec.Resource{{
Expand All @@ -238,7 +260,7 @@ func computeJobspec(groupName string, slots int, compute map[string]int) *jobspe
Label: "default",
With: withEntries(compute),
}},
Attributes: systemAttributes([]string{VirtualPropertyFalse}),
Attributes: systemAttributes([]string{VirtualPropertyFalse}, excludeNodes),
Tasks: []jobspec.Task{{
Command: []string{groupName},
Slot: "default",
Expand Down Expand Up @@ -272,7 +294,7 @@ func deviceJobspec(groupName, deviceType string, count int, extraProps []string)
Label: "device",
With: []jobspec.Resource{{Type: "node", Count: count}},
}},
Attributes: systemAttributes(props),
Attributes: systemAttributes(props, nil),
Tasks: []jobspec.Task{{
Command: []string{groupName},
Slot: "device",
Expand All @@ -299,6 +321,7 @@ func JobspecsForGroup(
groupName string,
pods []corev1.Pod,
knownDevices map[string]bool,
excludeNodes []string,
) ([]*jobspec.Jobspec, error) {
if len(pods) == 0 {
return nil, fmt.Errorf("pod group %q has no pods", groupName)
Expand All @@ -321,7 +344,7 @@ func JobspecsForGroup(
}
}

specs := []*jobspec.Jobspec{computeJobspec(groupName, len(pods), compute)}
specs := []*jobspec.Jobspec{computeJobspec(groupName, len(pods), compute, excludeNodes)}

// Deterministic device order for stable output.
deviceTypes := make([]string, 0, len(devices))
Expand Down
Loading
Loading