diff --git a/.github/workflows/e2e-tests.yaml b/.github/workflows/e2e-tests.yaml index a6c1266..9c83d9b 100644 --- a/.github/workflows/e2e-tests.yaml +++ b/.github/workflows/e2e-tests.yaml @@ -90,6 +90,12 @@ jobs: - name: E2E - classical gang run: bash test/e2e/01-classical-gang.sh + - name: E2E - multi-pod gang (all-or-nothing + contention) + run: bash test/e2e/06-multi-gang.sh + + - name: E2E - PostFilter re-match (taint-rejected allocation) + run: bash test/e2e/05-postfilter-rematch.sh + - name: Deploy quantum add-on run: | # Includes the device plugin and oriented to testing container diff --git a/examples/multi-gang-contention.yaml b/examples/multi-gang-contention.yaml new file mode 100644 index 0000000..6aa439a --- /dev/null +++ b/examples/multi-gang-contention.yaml @@ -0,0 +1,27 @@ +# Multi-pod gang via the WEBHOOK path (the path the experiments use): pods carry +# the group LABEL + group-size annotation; the fluence webhook creates the +# PodGroup with minCount = group-size (3). All 3 must place or none. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gang3 +spec: + replicas: 3 + selector: + matchLabels: {app: gang3} + template: + metadata: + labels: + app: gang3 + fluence.flux-framework.org/group: gang3 + annotations: + fluence.flux-framework.org/group-size: "3" + spec: + schedulerName: fluence + containers: + - name: worker + image: busybox + command: ["sleep", "3600"] + resources: + requests: + cpu: "1" diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index a1a10e1..45c72cd 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -77,12 +77,21 @@ type Fluence struct { mu sync.Mutex // placement maps a group key to its allocation (nodes, backend, jobids). placement map[string]groupAlloc + // excludedNodes maps a group key to the set of node names that have been + // rejected for that group by other scheduler plugins (taints, affinity, + // volume topology that Fluxion's graph does not model). PostFilter adds the + // whole failed allocation's nodes here; PreFilter feeds them back as an RFC 31 + // negated-hostlist constraint so the re-match is forced onto untried nodes. + // The set only grows for a group, guaranteeing the retry converges (finite + // node pool) and is cleared on teardown. Guarded by mu. + excludedNodes map[string]map[string]bool } var ( - _ fwk.PreFilterPlugin = (*Fluence)(nil) - _ fwk.FilterPlugin = (*Fluence)(nil) - _ fwk.PreBindPlugin = (*Fluence)(nil) + _ fwk.PreFilterPlugin = (*Fluence)(nil) + _ fwk.FilterPlugin = (*Fluence)(nil) + _ fwk.PostFilterPlugin = (*Fluence)(nil) + _ fwk.PreBindPlugin = (*Fluence)(nil) ) // New builds the plugin: discover cluster nodes, optionally inject quantum @@ -161,10 +170,11 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error fluxion.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "") f := &Fluence{ - handle: h, - matcher: fluxion, - knownDevices: knownDevices, - placement: map[string]groupAlloc{}, + handle: h, + matcher: fluxion, + knownDevices: knownDevices, + placement: map[string]groupAlloc{}, + excludedNodes: map[string]map[string]bool{}, } f.registerCancelHandlers() // Periodic + startup reconcile of completed Fluence-created PodGroups, so a @@ -251,7 +261,15 @@ func (f *Fluence) PreFilter( return nil, fwk.AsStatus(err) } - specs, err := placement.JobspecsForGroup(group, pods, f.knownDevices) + f.mu.Lock() + excluded := make([]string, 0, len(f.excludedNodes[group])) + for n := range f.excludedNodes[group] { + excluded = append(excluded, n) + } + f.mu.Unlock() + sort.Strings(excluded) // deterministic constraint for stable matching/logs + + specs, err := placement.JobspecsForGroup(group, pods, f.knownDevices, excluded) if err != nil { return nil, fwk.AsStatus(err) } @@ -390,6 +408,69 @@ func (f *Fluence) Filter( return fwk.NewStatus(fwk.Unschedulable, "node not in fluxion allocation for this group") } +// PostFilter runs when a pod could not be scheduled after Filter — for a Fluence +// group, this means the cached Fluxion allocation's nodes did not all survive +// the other scheduler plugins' Filter checks (a taint, node affinity, or volume +// topology constraint that Fluxion's resource graph does not model rejected one +// or more of them). Without intervention the group would retry forever against +// the same cached allocation while the Fluxion reservation leaked, because +// PreFilter short-circuits on the cache and nothing else releases it on a +// scheduling failure. +// +// We react by abandoning the failed allocation: the ENTIRE cached node set is +// added to the group's exclusion set, the Fluxion jobids are cancelled, and the +// cached placement is deleted. The next PreFilter for the group re-matches with +// an RFC 31 negated-hostlist constraint over the accumulated exclusion set, so +// Fluxion is forced onto untried nodes. We exclude the whole set (not just the +// individually-rejected nodes) deliberately: if the group as a whole could not +// be admitted, a node that happened to survive this round carries no guarantee +// for the next, and excluding the whole set makes each retry a strictly smaller, +// monotonic search that converges — either to a feasible allocation on untried +// nodes, or to a clean no-match (Unschedulable) once the graph is exhausted, at +// which point the pod waits for a cluster-state change rather than busy-looping. +func (f *Fluence) PostFilter( + ctx context.Context, + state fwk.CycleState, + pod *corev1.Pod, + filteredNodeStatusMap fwk.NodeToStatusReader, +) (*fwk.PostFilterResult, *fwk.Status) { + group := groupKey(pod) + + f.mu.Lock() + alloc, ok := f.placement[group] + if !ok { + // No cached allocation for this group — nothing of ours to reconcile. + // (Another plugin's PostFilter, or a non-group pod.) + f.mu.Unlock() + return nil, fwk.NewStatus(fwk.Unschedulable) + } + // Accumulate the whole failed allocation's nodes into the exclusion set. + if f.excludedNodes[group] == nil { + f.excludedNodes[group] = map[string]bool{} + } + for _, n := range alloc.place.Nodes { + f.excludedNodes[group][n] = true + } + excludedCount := len(f.excludedNodes[group]) + jobids := alloc.jobids + delete(f.placement, group) + f.mu.Unlock() + + // Release the Fluxion reservation for the abandoned allocation so the graph + // does not leak it while the group retries. + f.cancelJobids(jobids) + + log.Printf("[fluence] group %s unschedulable: abandoning allocation (nodes %v, "+ + "jobids %v); %d node(s) now excluded, will re-match on next cycle", + group, alloc.place.Nodes, jobids, excludedCount) + + // Returning Unschedulable (no nominated node) lets the pod be requeued; the + // next PreFilter re-matches with the enlarged exclusion set. We do not + // nominate a node — Fluxion, not PostFilter preemption, chooses the next + // placement. + return nil, fwk.NewStatus(fwk.Unschedulable) +} + // PreBindPreFlight runs before PreBind. It returns Success when we have a cached // allocation for the pod's group (so PreBind can record the jobid, and stamp the // backend for a quantum pod), and Skip otherwise. @@ -718,6 +799,7 @@ func (f *Fluence) cancelGroup(key string, ann map[string]string) { f.mu.Lock() delete(f.placement, key) + delete(f.excludedNodes, key) // drop accumulated exclusions so a future group reusing the name starts clean f.mu.Unlock() } diff --git a/pkg/fluence/fluence_test.go b/pkg/fluence/fluence_test.go index 998e1a7..6a53b56 100644 --- a/pkg/fluence/fluence_test.go +++ b/pkg/fluence/fluence_test.go @@ -1,6 +1,7 @@ package fluence import ( + "context" "errors" "testing" @@ -12,6 +13,7 @@ import ( schedv1a2 "k8s.io/api/scheduling/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" + fwk "k8s.io/kube-scheduler/framework" ) // fakeMatcher records Cancel calls so cancel behavior can be asserted without @@ -46,7 +48,11 @@ func (m *fakeMatcher) Cancel(jobid uint64) error { } func newTestFluence(m matcher) *Fluence { - return &Fluence{matcher: m, placement: map[string]groupAlloc{}} + return &Fluence{ + matcher: m, + placement: map[string]groupAlloc{}, + excludedNodes: map[string]map[string]bool{}, + } } func ann(jobid string) map[string]string { @@ -345,3 +351,103 @@ func twoSpecs() []*jobspec.Jobspec { {Version: 9999}, } } + +// --- PostFilter allocation reconciliation ----------------------------------- + +// PostFilter must abandon a group's failed allocation: add the WHOLE cached node +// set to the exclusion set, cancel the Fluxion jobids, and delete the cache, so +// the next PreFilter re-matches onto untried nodes. +func TestPostFilterAbandonsAndExcludesWholeNodeSet(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + key := "default/training" + f.placement[key] = groupAlloc{ + place: placement.Placement{Nodes: []string{"node-a", "node-b", "node-c"}}, + jobids: []uint64{11, 12}, + } + pod := groupedPod("default", "training-0", "training", nil) + + _, status := f.PostFilter(context.Background(), nil, pod, nil) + if status == nil || status.Code() != fwk.Unschedulable { + t.Fatalf("expected Unschedulable status, got %v", status) + } + // cache cleared + if _, still := f.placement[key]; still { + t.Fatal("placement cache should be deleted after PostFilter") + } + // jobids cancelled + if len(m.cancelled) != 2 { + t.Fatalf("expected both jobids cancelled, got %v", m.cancelled) + } + // the WHOLE node set excluded + excl := f.excludedNodes[key] + for _, n := range []string{"node-a", "node-b", "node-c"} { + if !excl[n] { + t.Fatalf("expected %s excluded, set=%v", n, excl) + } + } + if len(excl) != 3 { + t.Fatalf("expected exactly 3 excluded nodes, got %v", excl) + } +} + +// Repeated failures accumulate monotonically: a second abandoned allocation adds +// its nodes to the existing exclusion set (the set only grows -> convergence). +func TestPostFilterAccumulatesAcrossAttempts(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + key := "default/training" + pod := groupedPod("default", "training-0", "training", nil) + + // attempt 1 fails on {a,b} + f.placement[key] = groupAlloc{place: placement.Placement{Nodes: []string{"node-a", "node-b"}}, jobids: []uint64{1}} + f.PostFilter(context.Background(), nil, pod, nil) + // attempt 2 (re-matched elsewhere) fails on {c,d} + f.placement[key] = groupAlloc{place: placement.Placement{Nodes: []string{"node-c", "node-d"}}, jobids: []uint64{2}} + f.PostFilter(context.Background(), nil, pod, nil) + + excl := f.excludedNodes[key] + for _, n := range []string{"node-a", "node-b", "node-c", "node-d"} { + if !excl[n] { + t.Fatalf("expected %s in accumulated exclusion set, got %v", n, excl) + } + } + if len(excl) != 4 { + t.Fatalf("exclusion set should accumulate to 4, got %v", excl) + } +} + +// PostFilter on a group with no cached allocation (not ours, or already cleared) +// is a safe no-op: no panic, no cancel, returns Unschedulable. +func TestPostFilterUnknownGroupNoop(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + pod := groupedPod("default", "stranger-0", "stranger", nil) + + _, status := f.PostFilter(context.Background(), nil, pod, nil) + if status == nil || status.Code() != fwk.Unschedulable { + t.Fatalf("expected Unschedulable, got %v", status) + } + if len(m.cancelled) != 0 { + t.Fatalf("nothing should be cancelled for an unknown group, got %v", m.cancelled) + } + if len(f.excludedNodes) != 0 { + t.Fatalf("no exclusion set should be created for an unknown group, got %v", f.excludedNodes) + } +} + +// Teardown (cancelGroup) must clear the exclusion set so a future group reusing +// the same key does not inherit stale exclusions. +func TestCancelGroupClearsExclusions(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + key := "default/training" + f.placement[key] = groupAlloc{jobids: []uint64{9}} + f.excludedNodes[key] = map[string]bool{"node-a": true} + + f.cancelGroup(key, ann("9")) + + if _, still := f.excludedNodes[key]; still { + t.Fatal("exclusion set should be cleared on teardown") + } +} diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 554f319..c7f76de 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -214,14 +214,36 @@ func withEntries(counts map[string]int) []jobspec.Resource { // allocation (duration 0 runs to graph end) plus an RFC 31 property constraint // selecting the eligible node set. properties is the AND-set of composed // key=value property strings a matched node must carry. -func systemAttributes(properties []string) map[string]interface{} { +func systemAttributes(properties []string, excludeNodes []string) map[string]interface{} { + // Base property constraint (the eligible-node property AND-set). + constraints := map[string]interface{}{ + "properties": properties, + } + // When a group has had a placement rejected by other scheduler plugins + // (taints, affinity, volume topology that Fluxion's graph does not model), + // PostFilter accumulates the rejected hostnames and we AND in an RFC 31 + // negated hostlist so the re-match is forced onto untried nodes. RFC 31 is + // JsonLogic-style ({operator:[values]}, one operator per object), so to AND + // two operators we nest them under an explicit `and`. We only do this when + // there is something to exclude, so the no-exclusion jobspec is byte-for-byte + // what it was before (and existing tests/behavior are unchanged). + if len(excludeNodes) > 0 { + constraints = map[string]interface{}{ + "and": []interface{}{ + map[string]interface{}{"properties": properties}, + map[string]interface{}{ + "not": []interface{}{ + map[string]interface{}{"hostlist": excludeNodes}, + }, + }, + }, + } + } return map[string]interface{}{ "system": map[string]interface{}{ // duration 0 => hold the allocation until we explicitly Cancel. - "duration": 0, - "constraints": map[string]interface{}{ - "properties": properties, - }, + "duration": 0, + "constraints": constraints, }, } } @@ -229,7 +251,7 @@ func systemAttributes(properties []string) map[string]interface{} { // computeJobspec builds the physical-compute jobspec for a group: one slot per // pod holding the compute resources, constrained to virtual=false nodes. This is // the only jobspec for a group that requests no virtual devices. -func computeJobspec(groupName string, slots int, compute map[string]int) *jobspec.Jobspec { +func computeJobspec(groupName string, slots int, compute map[string]int, excludeNodes []string) *jobspec.Jobspec { return &jobspec.Jobspec{ Version: 9999, Resources: []jobspec.Resource{{ @@ -238,7 +260,7 @@ func computeJobspec(groupName string, slots int, compute map[string]int) *jobspe Label: "default", With: withEntries(compute), }}, - Attributes: systemAttributes([]string{VirtualPropertyFalse}), + Attributes: systemAttributes([]string{VirtualPropertyFalse}, excludeNodes), Tasks: []jobspec.Task{{ Command: []string{groupName}, Slot: "default", @@ -272,7 +294,7 @@ func deviceJobspec(groupName, deviceType string, count int, extraProps []string) Label: "device", With: []jobspec.Resource{{Type: "node", Count: count}}, }}, - Attributes: systemAttributes(props), + Attributes: systemAttributes(props, nil), Tasks: []jobspec.Task{{ Command: []string{groupName}, Slot: "device", @@ -299,6 +321,7 @@ func JobspecsForGroup( groupName string, pods []corev1.Pod, knownDevices map[string]bool, + excludeNodes []string, ) ([]*jobspec.Jobspec, error) { if len(pods) == 0 { return nil, fmt.Errorf("pod group %q has no pods", groupName) @@ -321,7 +344,7 @@ func JobspecsForGroup( } } - specs := []*jobspec.Jobspec{computeJobspec(groupName, len(pods), compute)} + specs := []*jobspec.Jobspec{computeJobspec(groupName, len(pods), compute, excludeNodes)} // Deterministic device order for stable output. deviceTypes := make([]string, 0, len(devices)) diff --git a/pkg/placement/placement_test.go b/pkg/placement/placement_test.go index 33786c8..fe68917 100644 --- a/pkg/placement/placement_test.go +++ b/pkg/placement/placement_test.go @@ -64,7 +64,7 @@ func TestClassicalSingleMatch(t *testing.T) { podWith("p0", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), podWith("p1", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), } - specs, err := JobspecsForGroup("grp", pods, nil) + specs, err := JobspecsForGroup("grp", pods, nil, nil) if err != nil { t.Fatal(err) } @@ -101,7 +101,7 @@ func TestGroupDeviceMatchWhenLeaderNotFirst(t *testing.T) { }) // Leader deliberately placed last. pods := []corev1.Pod{worker, worker, leader} - specs, err := JobspecsForGroup("qgrp", pods, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("qgrp", pods, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -132,7 +132,7 @@ func qpuPodWithRequires(name string, requires map[string]string) corev1.Pod { // constraints, nothing extra (over-constraining would break unconstrained runs). func TestNoRequireAnnotationsAddsNoConstraints(t *testing.T) { p := qpuPodWithRequires("q", nil) - specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -145,7 +145,7 @@ func TestNoRequireAnnotationsAddsNoConstraints(t *testing.T) { // Exactly one require- constraint. func TestSingleRequireConstraint(t *testing.T) { p := qpuPodWithRequires("q", map[string]string{"qrmi_type": "braket-gate"}) - specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -169,7 +169,7 @@ func TestMultipleRequireConstraintsAreDeduped(t *testing.T) { // a worker that happens to repeat one of the same require- annotations worker := qpuPodWithRequires("w0", map[string]string{"vendor": "amazon"}) specs, err := JobspecsForGroup("g", []corev1.Pod{leader, worker}, - map[string]bool{"qpu": true}) + map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -211,7 +211,7 @@ func TestRequireAnnotationConstrainsDevice(t *testing.T) { leader.Annotations[RequireAnnotationPrefix+"vendor"] = "amazon" specs, err := JobspecsForGroup("qgrp", []corev1.Pod{leader}, - map[string]bool{"qpu": true}) + map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -232,7 +232,7 @@ func TestDeviceProducesSecondMatch(t *testing.T) { FluxionResourcePrefix + "qpu": qty(1), }) known := map[string]bool{"qpu": true} - specs, err := JobspecsForGroup("qgrp", []corev1.Pod{p}, known) + specs, err := JobspecsForGroup("qgrp", []corev1.Pod{p}, known, nil) if err != nil { t.Fatal(err) } @@ -274,7 +274,7 @@ func TestDeviceProducesSecondMatch(t *testing.T) { // node), so there are two matches: compute (core=1, virtual=false) and device. func TestDeviceOnlyStillForcesCompute(t *testing.T) { p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qpu": qty(1)}) - specs, err := JobspecsForGroup("qonly", []corev1.Pod{p}, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("qonly", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -289,7 +289,7 @@ func TestDeviceOnlyStillForcesCompute(t *testing.T) { // Requesting a device type the graph does not model is a hard error. func TestUnknownDeviceErrors(t *testing.T) { p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "fpga": qty(1)}) - _, err := JobspecsForGroup("grp", []corev1.Pod{p}, map[string]bool{"qpu": true}) + _, err := JobspecsForGroup("grp", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err == nil { t.Fatal("expected an error for an unmodeled device type") } @@ -301,7 +301,7 @@ func TestHoldDurationZero(t *testing.T) { corev1.ResourceCPU: qty(1), FluxionResourcePrefix + "qpu": qty(1), }) - specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -366,3 +366,76 @@ func TestPlacementUnmarkedNodeIsCompute(t *testing.T) { t.Fatalf("unmarked node should not be a backend, got %q", p.Backend) } } + +// When excludeNodes is non-empty, the compute jobspec's constraint must AND the +// base properties with an RFC 31 negated hostlist, so a re-match avoids the +// rejected nodes. When empty, the constraint must be the plain properties form +// (byte-for-byte the pre-exclusion behavior). +func TestExcludeNodesAddsNegatedHostlist(t *testing.T) { + p := podWith("p", corev1.ResourceList{corev1.ResourceCPU: qty(1)}) + + // no exclusion -> plain properties, no `and`/`not` + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, nil, nil) + if err != nil { + t.Fatal(err) + } + cons := computeConstraints(t, specs[0]) + if _, hasAnd := cons["and"]; hasAnd { + t.Fatalf("no-exclusion constraint must not use `and`: %#v", cons) + } + if _, hasProps := cons["properties"]; !hasProps { + t.Fatalf("no-exclusion constraint must have plain properties: %#v", cons) + } + + // with exclusion -> and[ properties, not[ hostlist ] ] + specs, err = JobspecsForGroup("g", []corev1.Pod{p}, nil, []string{"node-b", "node-c"}) + if err != nil { + t.Fatal(err) + } + cons = computeConstraints(t, specs[0]) + andTerms, ok := cons["and"].([]interface{}) + if !ok || len(andTerms) != 2 { + t.Fatalf("exclusion constraint must be `and` of 2 terms: %#v", cons) + } + // find the not/hostlist term + foundHostlist := false + for _, term := range andTerms { + tm, _ := term.(map[string]interface{}) + notTerm, ok := tm["not"].([]interface{}) + if !ok || len(notTerm) == 0 { + continue + } + inner, _ := notTerm[0].(map[string]interface{}) + hl, ok := inner["hostlist"].([]string) + if !ok { + // json round-trip may make it []interface{}; accept both + if hlAny, ok2 := inner["hostlist"].([]interface{}); ok2 { + if len(hlAny) == 2 { + foundHostlist = true + } + } + continue + } + if len(hl) == 2 { + foundHostlist = true + } + } + if !foundHostlist { + t.Fatalf("exclusion constraint must contain not[hostlist[2 nodes]]: %#v", cons) + } +} + +// computeConstraints digs out attributes.system.constraints from the compute +// jobspec (the first spec; device specs do not carry node exclusions). +func computeConstraints(t *testing.T, spec *jobspec.Jobspec) map[string]interface{} { + t.Helper() + sys, ok := spec.Attributes["system"].(map[string]interface{}) + if !ok { + t.Fatalf("no system attributes: %#v", spec.Attributes) + } + cons, ok := sys["constraints"].(map[string]interface{}) + if !ok { + t.Fatalf("no constraints: %#v", sys) + } + return cons +} diff --git a/pkg/webhook/handler.go b/pkg/webhook/handler.go index 82a1227..d3ca5eb 100644 --- a/pkg/webhook/handler.go +++ b/pkg/webhook/handler.go @@ -28,7 +28,7 @@ type MutatorAPI interface { // PodGroup operations (gang scheduling). Group identity is the value of the // group label, which the core treats as an opaque string. PodGroupLeader(ctx context.Context, namespace, group string) string - EnsurePodGroup(ctx context.Context, namespace, group, leaderPod string) + EnsurePodGroup(ctx context.Context, namespace, group, leaderPod string, minCount int32) RecordLeader(ctx context.Context, namespace, group, leaderPod string) // EnsureSidecarRBAC provisions the per-namespace ServiceAccount/Role/Binding diff --git a/pkg/webhook/handlers/gang.go b/pkg/webhook/handlers/gang.go index a6c6126..015f51c 100644 --- a/pkg/webhook/handlers/gang.go +++ b/pkg/webhook/handlers/gang.go @@ -2,11 +2,14 @@ package handlers import ( "context" + "log" + "strconv" "github.com/converged-computing/fluence/pkg/webhook" "github.com/converged-computing/fluence/pkg/webhook/spec" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func init() { @@ -31,12 +34,60 @@ func (h *gangHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod *cor // First pod admitted in the group creates the PodGroup and is recorded as // the admission-order leader. All pods are linked to the group. if m.PodGroupLeader(ctx, pod.Namespace, g) == "" { - m.EnsurePodGroup(ctx, pod.Namespace, g, pod.Name) + // minCount = full gang size N (leader + workers) from the group-size + // annotation, so the whole group schedules atomically. NOT expected-workers + // (that is N-1, the workers the sidecar ungates). Absent -> 1. + m.EnsurePodGroup(ctx, pod.Namespace, g, pod.Name, resolveMinCount(ctx, m, pod)) m.RecordLeader(ctx, pod.Namespace, g, pod.Name) } return schedulingGroupOps(pod, g) } +// resolveMinCount determines the gang's atomic-schedule size N: +// 1. explicit group-size annotation -> honor it verbatim. This is the override +// for when minCount must differ from the parent's replica count (e.g. the +// quantum leader/worker split, where the gang's N is expressed directly). +// 2. otherwise derive from the OWNING object: a Flux Operator MiniCluster pod +// is owned by an indexed Job whose parallelism == completions == size == N. +// (The operator sets Parallelism = Completions = MiniCluster.Spec.Size.) +// 3. otherwise default to 1, logged — never silently size a multi-pod gang to 1. +// +// The leader/worker (quantum) split is orthogonal and unchanged: it is driven by +// RoleAnnotation / QuantumResource in the quantum handler. minCount is always the +// FULL gang N regardless of which pods get gated. +func resolveMinCount(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod) int32 { + // 1. explicit override + if pod.Annotations != nil { + if n := pod.Annotations[webhook.GroupSizeAnnotation]; n != "" { + if v, err := strconv.Atoi(n); err == nil && v > 0 { + return int32(v) + } + } + } + // 2. derive from the owning Job's parallelism + if c := m.Client(); c != nil { + for _, ref := range pod.OwnerReferences { + if ref.Kind != "Job" { + continue + } + job, err := c.BatchV1().Jobs(pod.Namespace).Get(ctx, ref.Name, metav1.GetOptions{}) + if err != nil { + break + } + if job.Spec.Parallelism != nil && *job.Spec.Parallelism > 0 { + return *job.Spec.Parallelism + } + if job.Spec.Completions != nil && *job.Spec.Completions > 0 { + return *job.Spec.Completions + } + } + } + // 3. no signal: a single-pod gang. Log so a missing size on a multi-pod + // workload is visible rather than a silent gang-of-1. + log.Printf("[fluence-webhook] group %s: no group-size annotation and no owning Job parallelism; defaulting minCount=1", webhook.GroupName(pod)) + return 1 +} + // schedulingGroupOps links a pod to its PodGroup via the native 1.36 field // spec.schedulingGroup.podGroupName. Idempotent if already linked. func schedulingGroupOps(pod *corev1.Pod, group string) []spec.Op { diff --git a/pkg/webhook/handlers/gang_test.go b/pkg/webhook/handlers/gang_test.go new file mode 100644 index 0000000..77f7f46 --- /dev/null +++ b/pkg/webhook/handlers/gang_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2024 Lawrence Livermore National Security, LLC + (c.f. AUTHORS, NOTICE.LLNS, COPYING) +SPDX-License-Identifier: Apache-2.0 +*/ + +// Tests for gang PodGroup minCount: the whole gang (full N) must schedule +// atomically. Regression guard for the bug where every PodGroup was created +// with minCount=1, so a multi-pod gang was "satisfied" by a single pod and the +// rest were stranded (partial placement). +package handlers + +import ( + "context" + "testing" + + "strconv" + + "github.com/converged-computing/fluence/pkg/webhook" + + corev1 "k8s.io/api/core/v1" + + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +// minCountOf runs the gang handler for the leader pod of a group and returns the +// minCount of the PodGroup the webhook created. +func minCountOf(t *testing.T, pod *corev1.Pod) int32 { + t.Helper() + m := &webhook.Mutator{Clientset: fake.NewSimpleClientset()} + m.Mutate(context.Background(), pod) + pg, err := m.Clientset.SchedulingV1alpha2(). + PodGroups(pod.Namespace).Get(context.Background(), webhook.GroupName(pod), metav1.GetOptions{}) + if err != nil { + t.Fatalf("PodGroup not created: %v", err) + } + if pg.Spec.SchedulingPolicy.Gang == nil { + t.Fatal("PodGroup has no gang scheduling policy") + } + return pg.Spec.SchedulingPolicy.Gang.MinCount +} + +// minCountWithClient runs the gang handler with a pre-seeded clientset (so the +// owning Job exists) and returns the created PodGroup's minCount. +func minCountWithClient(t *testing.T, pod *corev1.Pod, objs ...interface{}) int32 { + t.Helper() + cs := fake.NewSimpleClientset(toRuntime(objs)...) + m := &webhook.Mutator{Clientset: cs} + m.Mutate(context.Background(), pod) + pg, err := cs.SchedulingV1alpha2().PodGroups(pod.Namespace). + Get(context.Background(), webhook.GroupName(pod), metav1.GetOptions{}) + if err != nil { + t.Fatalf("PodGroup not created: %v", err) + } + return pg.Spec.SchedulingPolicy.Gang.MinCount +} + +func jobWithParallelism(ns, name string, n int32) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: ns}, + Spec: batchv1.JobSpec{Parallelism: &n, Completions: &n}, + } +} + +func ownedBy(pod *corev1.Pod, kind, name string) { + pod.OwnerReferences = append(pod.OwnerReferences, + metav1.OwnerReference{Kind: kind, Name: name}) +} + +// No annotation, but the pod is owned by an indexed Job with parallelism N +// (the Flux Operator MiniCluster case: Parallelism == Completions == size == N). +// minCount must come from the Job. +func TestGangMinCountDerivedFromOwningJob(t *testing.T) { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "mc-gang"} + ownedBy(pod, "Job", "mc-gang-job") + got := minCountWithClient(t, pod, jobWithParallelism("default", "mc-gang-job", 4)) + if got != 4 { + t.Errorf("owner-derived: minCount=%d, want 4 (from Job parallelism)", got) + } +} + +// The explicit annotation OVERRIDES the owning Job's parallelism (the override +// exists precisely because minCount may differ from the parent replica count). +func TestGangMinCountAnnotationOverridesOwner(t *testing.T) { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "ovr-gang"} + pod.Annotations = map[string]string{webhook.GroupSizeAnnotation: "2"} + ownedBy(pod, "Job", "ovr-gang-job") + got := minCountWithClient(t, pod, jobWithParallelism("default", "ovr-gang-job", 8)) + if got != 2 { + t.Errorf("annotation override: minCount=%d, want 2 (annotation wins over Job=8)", got) + } +} + +// A classical gang of size N must get minCount = N so the whole group schedules +// atomically (this is the core multi-gang fix). +func atoi32(s string) int32 { v, _ := strconv.Atoi(s); return int32(v) } + +func toRuntime(objs []interface{}) []runtime.Object { + out := make([]runtime.Object, 0, len(objs)) + for _, o := range objs { + if ro, ok := o.(runtime.Object); ok { + out = append(out, ro) + } + } + return out +} + +func TestGangMinCountEqualsGroupSize(t *testing.T) { + for _, n := range []string{"2", "4", "8"} { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "g-" + n} + pod.Annotations = map[string]string{webhook.GroupSizeAnnotation: n} + got := minCountOf(t, pod) + want := atoi32(n) + if got != want { + t.Errorf("group-size=%s: minCount=%d, want %d", n, got, want) + } + } +} + +// No group-size annotation -> minCount falls back to 1 (single-pod gang). +func TestGangMinCountDefaultsToOne(t *testing.T) { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "g-default"} + if got := minCountOf(t, pod); got != 1 { + t.Errorf("absent group-size: minCount=%d, want 1", got) + } +} + +// Quantum distinction: a gang of full size N=4 that ALSO carries +// expected-workers=3 (the N-1 workers the sidecar ungates) must still get +// minCount=4 (the whole gang), NOT 3. minCount comes from group-size, not +// expected-workers. +func TestGangMinCountHonorsFullNWithQuantumSplit(t *testing.T) { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "q-gang"} + pod.Annotations = map[string]string{ + webhook.GroupSizeAnnotation: "4", // full N (leader + workers) + webhook.ExpectedWorkersAnnotation: "3", // N-1 workers to ungate + } + if got := minCountOf(t, pod); got != 4 { + t.Errorf("quantum gang: minCount=%d, want 4 (full N, not N-1)", got) + } +} diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index 20a7288..45af6da 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -71,6 +71,13 @@ const ( // opaque string and ascribes no meaning to it beyond propagation. ExpectedWorkersAnnotation = "fluence.flux-framework.org/expected-workers" + // GroupSizeAnnotation is the FULL gang member count N (leader + workers), + // set by the workload on each pod. It drives the PodGroup gang minCount so the + // whole group schedules atomically. This is distinct from + // ExpectedWorkersAnnotation (N-1: the workers the sidecar ungates; the leader + // is not gated). For a classical gang with no leader/worker split, N = size. + GroupSizeAnnotation = "fluence.flux-framework.org/group-size" + // Sidecar/staging infrastructure (generic — not quantum-specific). SidecarImage = "ghcr.io/converged-computing/fluence-sidecar:latest" SidecarServiceAccount = "fluence-sidecar" @@ -159,8 +166,13 @@ func (m *Mutator) PodGroupLeader(ctx context.Context, namespace, group string) s return "" } -// EnsurePodGroup creates a Fluence-owned PodGroup (minCount:1) if absent. -func (m *Mutator) EnsurePodGroup(ctx context.Context, namespace, group, leaderPod string) { +// EnsurePodGroup creates a Fluence-owned PodGroup with gang minCount = the full +// gang size N (the whole group schedules atomically) if absent. minCount<=0 +// falls back to 1. +func (m *Mutator) EnsurePodGroup(ctx context.Context, namespace, group, leaderPod string, minCount int32) { + if minCount <= 0 { + minCount = 1 + } if m.Clientset == nil { return } @@ -179,14 +191,14 @@ func (m *Mutator) EnsurePodGroup(ctx context.Context, namespace, group, leaderPo }, Spec: schedulingv1alpha2.PodGroupSpec{ SchedulingPolicy: schedulingv1alpha2.PodGroupSchedulingPolicy{ - Gang: &schedulingv1alpha2.GangSchedulingPolicy{MinCount: 1}, + Gang: &schedulingv1alpha2.GangSchedulingPolicy{MinCount: minCount}, }, }, } if _, err := m.Clientset.SchedulingV1alpha2().PodGroups(namespace).Create(ctx, pg, metav1.CreateOptions{}); err != nil { log.Printf("[fluence-webhook] could not create PodGroup %s/%s: %v", namespace, group, err) } else { - log.Printf("[fluence-webhook] created PodGroup %s/%s (minCount=1)", namespace, group) + log.Printf("[fluence-webhook] created PodGroup %s/%s (minCount=%d)", namespace, group, minCount) } } diff --git a/test/e2e/01-classical-gang.sh b/test/e2e/01-classical-gang.sh index d2018ac..ffe8fa8 100644 --- a/test/e2e/01-classical-gang.sh +++ b/test/e2e/01-classical-gang.sh @@ -27,3 +27,7 @@ count="$(kubectl get pods -l app=training --no-headers | wc -l | tr -d ' ')" log "PASS: classical gang placed all $count pods via fluence" kubectl delete -f examples/single-podgroup.yaml --wait=false || true kubectl patch podgroup training --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +# Wait for the pods to actually be gone before the next test runs — otherwise a +# terminating 'training' pod (same name/labels reused by other scenarios) can be +# misread as the next test's placement. +kubectl wait --for=delete pod -l app=training --timeout=60s 2>/dev/null || true diff --git a/test/e2e/05-postfilter-rematch.sh b/test/e2e/05-postfilter-rematch.sh new file mode 100644 index 0000000..1712ea9 --- /dev/null +++ b/test/e2e/05-postfilter-rematch.sh @@ -0,0 +1,112 @@ +#!/usr/bin/env bash +# PostFilter re-match: when another scheduler plugin (TaintToleration) rejects a +# node Fluxion allocated, Fluence must abandon that allocation, exclude the node, +# and re-match onto an untainted node. Safety: the gang's RUNNING pod must NEVER +# bind to the tainted node. +# +# This test is self-isolating: it uses its own workload name (pf-rematch) and +# labels, distinct from the other e2e scenarios, and ensures a clean slate first, +# so a pod left over (terminating) from a previous test can never be mistaken for +# this test's placement. It also ignores terminating pods when asserting. +set -euo pipefail +HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE}/lib.sh" + +NAME=pf-rematch +SEL="app=${NAME}" + +log "TEST 5: PostFilter abandons a taint-rejected allocation and re-matches" + +# --- clean slate: no leftover pods from earlier tests under our name ---------- +kubectl delete deployment "$NAME" --ignore-not-found >/dev/null 2>&1 || true +kubectl delete podgroup "$NAME" --ignore-not-found >/dev/null 2>&1 || true +kubectl patch podgroup "$NAME" --type=merge \ + -p '{"metadata":{"finalizers":null}}' >/dev/null 2>&1 || true +kubectl wait --for=delete pod -l "$SEL" --timeout=60s >/dev/null 2>&1 || true + +TAINTED="$(kubectl get nodes -l '!node-role.kubernetes.io/control-plane' \ + -o jsonpath='{.items[0].metadata.name}')" +[ -n "$TAINTED" ] || fail "no worker node found to taint" +log "tainting node $TAINTED with fluence-e2e=blocked:NoSchedule" +kubectl taint nodes "$TAINTED" fluence-e2e=blocked:NoSchedule --overwrite + +cleanup() { + kubectl taint nodes "$TAINTED" fluence-e2e- 2>/dev/null || true + kubectl delete deployment "$NAME" --ignore-not-found --wait=false 2>/dev/null || true + kubectl delete podgroup "$NAME" --ignore-not-found --wait=false 2>/dev/null || true + kubectl patch podgroup "$NAME" --type=merge \ + -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +} +trap cleanup EXIT + +# --- our own workload (distinct name/labels; does NOT tolerate the taint) ------ +kubectl apply -f - <" for empty fields, so an empty deletionTimestamp + # shows as "", NOT "". Treat "" as empty for both columns. + if [ "$deleted" != "" ] && [ -n "$deleted" ]; then continue; fi # skip terminating + if [ "$node" = "" ] || [ -z "$node" ]; then continue; fi # skip not-yet-bound + checked=$((checked+1)) + if [ "$node" = "$TAINTED" ]; then + fail "SAFETY VIOLATION: running pod $name is bound to the tainted node $TAINTED" + fi + log "$name correctly placed on $node (not the tainted $TAINTED)" +done < <(kubectl get pods -l "$SEL" \ + -o custom-columns='N:.metadata.name,NODE:.spec.nodeName,DEL:.metadata.deletionTimestamp' \ + --no-headers) + +[ "$checked" -ge 1 ] || fail "no running ${NAME} pod found to check" + +# Informational: did PostFilter actually fire (Fluxion picked the tainted node +# first and we re-matched), or did Fluxion place on the good node directly? +POD="$(kubectl -n kube-system get pods -l app=fluence \ + -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || true)" +if [ -n "$POD" ] && kubectl -n kube-system logs "$POD" 2>/dev/null \ + | grep -q "unschedulable: abandoning allocation"; then + log "observed PostFilter abandonment in scheduler log (re-match path exercised)" +else + log "note: Fluxion placed on the untainted node directly this run (PostFilter not needed)" +fi + +log "PASS: gang scheduled on an untainted node; no running pod on the tainted node" diff --git a/test/e2e/06-multi-gang.sh b/test/e2e/06-multi-gang.sh new file mode 100644 index 0000000..dc538d1 --- /dev/null +++ b/test/e2e/06-multi-gang.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash +# Multi-pod gang scheduling on real nodes. Guards the two failures that the +# single-pod 01 test could NOT catch (and that shipped a minCount=1 bug): +# A) a 3-pod gang must place ALL 3 (minCount must equal the gang size, not 1) +# B) under contention, a gang that cannot fully fit stays ENTIRELY pending — +# never partially placed (no stranded pods holding nodes). +set -euo pipefail +HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE}/lib.sh" + +# ---- A) all-or-nothing placement of a 3-pod gang ------------------------------- +log "TEST 6A: multi-pod gang (3) places all-or-nothing" +kubectl apply -f examples/multi-gang.yaml + +# the webhook must have created the PodGroup with minCount = 3 (the bug set it to 1) +log "checking PodGroup minCount == 3 (set by webhook from group-size)" +for i in $(seq 1 30); do + mc="$(kubectl get podgroup gang3 -o jsonpath='{.spec.schedulingPolicy.gang.minCount}' 2>/dev/null || true)" + [ -n "$mc" ] && break; sleep 2 +done +[ "$mc" = "3" ] || fail "PodGroup gang3 minCount=$mc, want 3 (minCount=1 bug -> partial gangs)" + +log "waiting for all 3 gang pods to be Ready" +wait_pods_ready "app=gang3" 3 180 || fail "gang3 did not place all 3 pods (gang scheduling failed)" + +count="$(kubectl get pods -l app=gang3 --field-selector=status.phase=Running --no-headers | wc -l | tr -d ' ')" +[ "$count" = "3" ] || fail "expected 3 Running gang3 pods, got $count (partial placement)" +for p in $(kubectl get pods -l app=gang3 -o name); do + pod="${p#pod/}" + sched="$(kubectl get pod "$pod" -o jsonpath='{.spec.schedulerName}')" + [ "$sched" = "fluence" ] || fail "$pod not scheduled by fluence (got: $sched)" +done +log "PASS 6A: 3-pod gang placed atomically by fluence (minCount=3)" + +kubectl delete -f examples/multi-gang.yaml --wait=false || true +kubectl patch podgroup gang3 --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +kubectl wait --for=delete pod -l app=gang3 --timeout=60s 2>/dev/null || true + +# ---- B) contention: the gang that can't fully fit stays ENTIRELY pending -------- +log "TEST 6B: contention — a gang that cannot fully fit must NOT partially place" +kubectl apply -f examples/multi-gang-contention.yaml + +# wait until the cluster settles: exactly one gang fully Running, the other fully Pending. +log "waiting for one gang to win placement" +winner=""; loser="" +for i in $(seq 1 90); do + ra="$(kubectl get pods -l app=gangA --field-selector=status.phase=Running --no-headers 2>/dev/null | wc -l | tr -d ' ')" + rb="$(kubectl get pods -l app=gangB --field-selector=status.phase=Running --no-headers 2>/dev/null | wc -l | tr -d ' ')" + if [ "$ra" = "2" ] && [ "$rb" = "0" ]; then winner=gangA; loser=gangB; break; fi + if [ "$rb" = "2" ] && [ "$ra" = "0" ]; then winner=gangB; loser=gangA; break; fi + sleep 2 +done +[ -n "$winner" ] || fail "neither gang reached a clean 2/0 placement (check for partial placement)" +log "winner=$winner (2 running), loser=$loser (expected 0 running)" + +# the loser must have ZERO pods scheduled to a node — the all-or-nothing guarantee. +# A single scheduled loser pod = partial placement = the bug. +scheduled_loser="$(kubectl get pods -l app=$loser -o jsonpath='{range .items[*]}{.spec.nodeName}{"\n"}{end}' | grep -c . || true)" +[ "$scheduled_loser" = "0" ] || fail "$loser has $scheduled_loser pod(s) on a node — PARTIAL placement (gang violated)" +log "PASS 6B: $loser stayed entirely pending — no partial placement under contention" + +kubectl delete -f examples/multi-gang-contention.yaml --wait=false || true +for g in gangA gangB; do + kubectl patch podgroup $g --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +done +kubectl wait --for=delete pod -l app=gangA --timeout=60s 2>/dev/null || true +kubectl wait --for=delete pod -l app=gangB --timeout=60s 2>/dev/null || true +log "PASS: multi-gang all-or-nothing verified"