From b3b6716c7ac52b18832e52a8eafb19afaab95a2a Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 24 Jun 2026 22:35:59 -0700 Subject: [PATCH 1/2] wip: postfilter rematch I want to support having other filters (extension points) running with fluence, which means that fluence might make a scheduling decision that is then rejected by some other plugin. If this happens we need to cancel the allocation and record the node names to give as constraints the next time around. This will likely take me a few days to work on, and I am thinking we want to replicate our original experiments in this paper to demosntrate the basic gang scheduling before we go into quantum / custom resources. Signed-off-by: vsoch --- .github/workflows/e2e-tests.yaml | 3 + pkg/fluence/fluence.go | 98 +++++++++++++++++++++++--- pkg/fluence/fluence_test.go | 108 +++++++++++++++++++++++++++- pkg/placement/placement.go | 41 ++++++++--- pkg/placement/placement_test.go | 93 ++++++++++++++++++++++--- test/e2e/01-classical-gang.sh | 4 ++ test/e2e/05-postfilter-rematch.sh | 112 ++++++++++++++++++++++++++++++ 7 files changed, 431 insertions(+), 28 deletions(-) create mode 100644 test/e2e/05-postfilter-rematch.sh diff --git a/.github/workflows/e2e-tests.yaml b/.github/workflows/e2e-tests.yaml index a6c1266..6ab7613 100644 --- a/.github/workflows/e2e-tests.yaml +++ b/.github/workflows/e2e-tests.yaml @@ -90,6 +90,9 @@ jobs: - name: E2E - classical gang run: bash test/e2e/01-classical-gang.sh + - name: E2E - PostFilter re-match (taint-rejected allocation) + run: bash test/e2e/05-postfilter-rematch.sh + - name: Deploy quantum add-on run: | # Includes the device plugin and oriented to testing container diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index a1a10e1..45c72cd 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -77,12 +77,21 @@ type Fluence struct { mu sync.Mutex // placement maps a group key to its allocation (nodes, backend, jobids). placement map[string]groupAlloc + // excludedNodes maps a group key to the set of node names that have been + // rejected for that group by other scheduler plugins (taints, affinity, + // volume topology that Fluxion's graph does not model). PostFilter adds the + // whole failed allocation's nodes here; PreFilter feeds them back as an RFC 31 + // negated-hostlist constraint so the re-match is forced onto untried nodes. + // The set only grows for a group, guaranteeing the retry converges (finite + // node pool) and is cleared on teardown. Guarded by mu. + excludedNodes map[string]map[string]bool } var ( - _ fwk.PreFilterPlugin = (*Fluence)(nil) - _ fwk.FilterPlugin = (*Fluence)(nil) - _ fwk.PreBindPlugin = (*Fluence)(nil) + _ fwk.PreFilterPlugin = (*Fluence)(nil) + _ fwk.FilterPlugin = (*Fluence)(nil) + _ fwk.PostFilterPlugin = (*Fluence)(nil) + _ fwk.PreBindPlugin = (*Fluence)(nil) ) // New builds the plugin: discover cluster nodes, optionally inject quantum @@ -161,10 +170,11 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error fluxion.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "") f := &Fluence{ - handle: h, - matcher: fluxion, - knownDevices: knownDevices, - placement: map[string]groupAlloc{}, + handle: h, + matcher: fluxion, + knownDevices: knownDevices, + placement: map[string]groupAlloc{}, + excludedNodes: map[string]map[string]bool{}, } f.registerCancelHandlers() // Periodic + startup reconcile of completed Fluence-created PodGroups, so a @@ -251,7 +261,15 @@ func (f *Fluence) PreFilter( return nil, fwk.AsStatus(err) } - specs, err := placement.JobspecsForGroup(group, pods, f.knownDevices) + f.mu.Lock() + excluded := make([]string, 0, len(f.excludedNodes[group])) + for n := range f.excludedNodes[group] { + excluded = append(excluded, n) + } + f.mu.Unlock() + sort.Strings(excluded) // deterministic constraint for stable matching/logs + + specs, err := placement.JobspecsForGroup(group, pods, f.knownDevices, excluded) if err != nil { return nil, fwk.AsStatus(err) } @@ -390,6 +408,69 @@ func (f *Fluence) Filter( return fwk.NewStatus(fwk.Unschedulable, "node not in fluxion allocation for this group") } +// PostFilter runs when a pod could not be scheduled after Filter — for a Fluence +// group, this means the cached Fluxion allocation's nodes did not all survive +// the other scheduler plugins' Filter checks (a taint, node affinity, or volume +// topology constraint that Fluxion's resource graph does not model rejected one +// or more of them). Without intervention the group would retry forever against +// the same cached allocation while the Fluxion reservation leaked, because +// PreFilter short-circuits on the cache and nothing else releases it on a +// scheduling failure. +// +// We react by abandoning the failed allocation: the ENTIRE cached node set is +// added to the group's exclusion set, the Fluxion jobids are cancelled, and the +// cached placement is deleted. The next PreFilter for the group re-matches with +// an RFC 31 negated-hostlist constraint over the accumulated exclusion set, so +// Fluxion is forced onto untried nodes. We exclude the whole set (not just the +// individually-rejected nodes) deliberately: if the group as a whole could not +// be admitted, a node that happened to survive this round carries no guarantee +// for the next, and excluding the whole set makes each retry a strictly smaller, +// monotonic search that converges — either to a feasible allocation on untried +// nodes, or to a clean no-match (Unschedulable) once the graph is exhausted, at +// which point the pod waits for a cluster-state change rather than busy-looping. +func (f *Fluence) PostFilter( + ctx context.Context, + state fwk.CycleState, + pod *corev1.Pod, + filteredNodeStatusMap fwk.NodeToStatusReader, +) (*fwk.PostFilterResult, *fwk.Status) { + group := groupKey(pod) + + f.mu.Lock() + alloc, ok := f.placement[group] + if !ok { + // No cached allocation for this group — nothing of ours to reconcile. + // (Another plugin's PostFilter, or a non-group pod.) + f.mu.Unlock() + return nil, fwk.NewStatus(fwk.Unschedulable) + } + // Accumulate the whole failed allocation's nodes into the exclusion set. + if f.excludedNodes[group] == nil { + f.excludedNodes[group] = map[string]bool{} + } + for _, n := range alloc.place.Nodes { + f.excludedNodes[group][n] = true + } + excludedCount := len(f.excludedNodes[group]) + jobids := alloc.jobids + delete(f.placement, group) + f.mu.Unlock() + + // Release the Fluxion reservation for the abandoned allocation so the graph + // does not leak it while the group retries. + f.cancelJobids(jobids) + + log.Printf("[fluence] group %s unschedulable: abandoning allocation (nodes %v, "+ + "jobids %v); %d node(s) now excluded, will re-match on next cycle", + group, alloc.place.Nodes, jobids, excludedCount) + + // Returning Unschedulable (no nominated node) lets the pod be requeued; the + // next PreFilter re-matches with the enlarged exclusion set. We do not + // nominate a node — Fluxion, not PostFilter preemption, chooses the next + // placement. + return nil, fwk.NewStatus(fwk.Unschedulable) +} + // PreBindPreFlight runs before PreBind. It returns Success when we have a cached // allocation for the pod's group (so PreBind can record the jobid, and stamp the // backend for a quantum pod), and Skip otherwise. @@ -718,6 +799,7 @@ func (f *Fluence) cancelGroup(key string, ann map[string]string) { f.mu.Lock() delete(f.placement, key) + delete(f.excludedNodes, key) // drop accumulated exclusions so a future group reusing the name starts clean f.mu.Unlock() } diff --git a/pkg/fluence/fluence_test.go b/pkg/fluence/fluence_test.go index 998e1a7..6a53b56 100644 --- a/pkg/fluence/fluence_test.go +++ b/pkg/fluence/fluence_test.go @@ -1,6 +1,7 @@ package fluence import ( + "context" "errors" "testing" @@ -12,6 +13,7 @@ import ( schedv1a2 "k8s.io/api/scheduling/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" + fwk "k8s.io/kube-scheduler/framework" ) // fakeMatcher records Cancel calls so cancel behavior can be asserted without @@ -46,7 +48,11 @@ func (m *fakeMatcher) Cancel(jobid uint64) error { } func newTestFluence(m matcher) *Fluence { - return &Fluence{matcher: m, placement: map[string]groupAlloc{}} + return &Fluence{ + matcher: m, + placement: map[string]groupAlloc{}, + excludedNodes: map[string]map[string]bool{}, + } } func ann(jobid string) map[string]string { @@ -345,3 +351,103 @@ func twoSpecs() []*jobspec.Jobspec { {Version: 9999}, } } + +// --- PostFilter allocation reconciliation ----------------------------------- + +// PostFilter must abandon a group's failed allocation: add the WHOLE cached node +// set to the exclusion set, cancel the Fluxion jobids, and delete the cache, so +// the next PreFilter re-matches onto untried nodes. +func TestPostFilterAbandonsAndExcludesWholeNodeSet(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + key := "default/training" + f.placement[key] = groupAlloc{ + place: placement.Placement{Nodes: []string{"node-a", "node-b", "node-c"}}, + jobids: []uint64{11, 12}, + } + pod := groupedPod("default", "training-0", "training", nil) + + _, status := f.PostFilter(context.Background(), nil, pod, nil) + if status == nil || status.Code() != fwk.Unschedulable { + t.Fatalf("expected Unschedulable status, got %v", status) + } + // cache cleared + if _, still := f.placement[key]; still { + t.Fatal("placement cache should be deleted after PostFilter") + } + // jobids cancelled + if len(m.cancelled) != 2 { + t.Fatalf("expected both jobids cancelled, got %v", m.cancelled) + } + // the WHOLE node set excluded + excl := f.excludedNodes[key] + for _, n := range []string{"node-a", "node-b", "node-c"} { + if !excl[n] { + t.Fatalf("expected %s excluded, set=%v", n, excl) + } + } + if len(excl) != 3 { + t.Fatalf("expected exactly 3 excluded nodes, got %v", excl) + } +} + +// Repeated failures accumulate monotonically: a second abandoned allocation adds +// its nodes to the existing exclusion set (the set only grows -> convergence). +func TestPostFilterAccumulatesAcrossAttempts(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + key := "default/training" + pod := groupedPod("default", "training-0", "training", nil) + + // attempt 1 fails on {a,b} + f.placement[key] = groupAlloc{place: placement.Placement{Nodes: []string{"node-a", "node-b"}}, jobids: []uint64{1}} + f.PostFilter(context.Background(), nil, pod, nil) + // attempt 2 (re-matched elsewhere) fails on {c,d} + f.placement[key] = groupAlloc{place: placement.Placement{Nodes: []string{"node-c", "node-d"}}, jobids: []uint64{2}} + f.PostFilter(context.Background(), nil, pod, nil) + + excl := f.excludedNodes[key] + for _, n := range []string{"node-a", "node-b", "node-c", "node-d"} { + if !excl[n] { + t.Fatalf("expected %s in accumulated exclusion set, got %v", n, excl) + } + } + if len(excl) != 4 { + t.Fatalf("exclusion set should accumulate to 4, got %v", excl) + } +} + +// PostFilter on a group with no cached allocation (not ours, or already cleared) +// is a safe no-op: no panic, no cancel, returns Unschedulable. +func TestPostFilterUnknownGroupNoop(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + pod := groupedPod("default", "stranger-0", "stranger", nil) + + _, status := f.PostFilter(context.Background(), nil, pod, nil) + if status == nil || status.Code() != fwk.Unschedulable { + t.Fatalf("expected Unschedulable, got %v", status) + } + if len(m.cancelled) != 0 { + t.Fatalf("nothing should be cancelled for an unknown group, got %v", m.cancelled) + } + if len(f.excludedNodes) != 0 { + t.Fatalf("no exclusion set should be created for an unknown group, got %v", f.excludedNodes) + } +} + +// Teardown (cancelGroup) must clear the exclusion set so a future group reusing +// the same key does not inherit stale exclusions. +func TestCancelGroupClearsExclusions(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + key := "default/training" + f.placement[key] = groupAlloc{jobids: []uint64{9}} + f.excludedNodes[key] = map[string]bool{"node-a": true} + + f.cancelGroup(key, ann("9")) + + if _, still := f.excludedNodes[key]; still { + t.Fatal("exclusion set should be cleared on teardown") + } +} diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 554f319..c7f76de 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -214,14 +214,36 @@ func withEntries(counts map[string]int) []jobspec.Resource { // allocation (duration 0 runs to graph end) plus an RFC 31 property constraint // selecting the eligible node set. properties is the AND-set of composed // key=value property strings a matched node must carry. -func systemAttributes(properties []string) map[string]interface{} { +func systemAttributes(properties []string, excludeNodes []string) map[string]interface{} { + // Base property constraint (the eligible-node property AND-set). + constraints := map[string]interface{}{ + "properties": properties, + } + // When a group has had a placement rejected by other scheduler plugins + // (taints, affinity, volume topology that Fluxion's graph does not model), + // PostFilter accumulates the rejected hostnames and we AND in an RFC 31 + // negated hostlist so the re-match is forced onto untried nodes. RFC 31 is + // JsonLogic-style ({operator:[values]}, one operator per object), so to AND + // two operators we nest them under an explicit `and`. We only do this when + // there is something to exclude, so the no-exclusion jobspec is byte-for-byte + // what it was before (and existing tests/behavior are unchanged). + if len(excludeNodes) > 0 { + constraints = map[string]interface{}{ + "and": []interface{}{ + map[string]interface{}{"properties": properties}, + map[string]interface{}{ + "not": []interface{}{ + map[string]interface{}{"hostlist": excludeNodes}, + }, + }, + }, + } + } return map[string]interface{}{ "system": map[string]interface{}{ // duration 0 => hold the allocation until we explicitly Cancel. - "duration": 0, - "constraints": map[string]interface{}{ - "properties": properties, - }, + "duration": 0, + "constraints": constraints, }, } } @@ -229,7 +251,7 @@ func systemAttributes(properties []string) map[string]interface{} { // computeJobspec builds the physical-compute jobspec for a group: one slot per // pod holding the compute resources, constrained to virtual=false nodes. This is // the only jobspec for a group that requests no virtual devices. -func computeJobspec(groupName string, slots int, compute map[string]int) *jobspec.Jobspec { +func computeJobspec(groupName string, slots int, compute map[string]int, excludeNodes []string) *jobspec.Jobspec { return &jobspec.Jobspec{ Version: 9999, Resources: []jobspec.Resource{{ @@ -238,7 +260,7 @@ func computeJobspec(groupName string, slots int, compute map[string]int) *jobspe Label: "default", With: withEntries(compute), }}, - Attributes: systemAttributes([]string{VirtualPropertyFalse}), + Attributes: systemAttributes([]string{VirtualPropertyFalse}, excludeNodes), Tasks: []jobspec.Task{{ Command: []string{groupName}, Slot: "default", @@ -272,7 +294,7 @@ func deviceJobspec(groupName, deviceType string, count int, extraProps []string) Label: "device", With: []jobspec.Resource{{Type: "node", Count: count}}, }}, - Attributes: systemAttributes(props), + Attributes: systemAttributes(props, nil), Tasks: []jobspec.Task{{ Command: []string{groupName}, Slot: "device", @@ -299,6 +321,7 @@ func JobspecsForGroup( groupName string, pods []corev1.Pod, knownDevices map[string]bool, + excludeNodes []string, ) ([]*jobspec.Jobspec, error) { if len(pods) == 0 { return nil, fmt.Errorf("pod group %q has no pods", groupName) @@ -321,7 +344,7 @@ func JobspecsForGroup( } } - specs := []*jobspec.Jobspec{computeJobspec(groupName, len(pods), compute)} + specs := []*jobspec.Jobspec{computeJobspec(groupName, len(pods), compute, excludeNodes)} // Deterministic device order for stable output. deviceTypes := make([]string, 0, len(devices)) diff --git a/pkg/placement/placement_test.go b/pkg/placement/placement_test.go index 33786c8..fe68917 100644 --- a/pkg/placement/placement_test.go +++ b/pkg/placement/placement_test.go @@ -64,7 +64,7 @@ func TestClassicalSingleMatch(t *testing.T) { podWith("p0", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), podWith("p1", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), } - specs, err := JobspecsForGroup("grp", pods, nil) + specs, err := JobspecsForGroup("grp", pods, nil, nil) if err != nil { t.Fatal(err) } @@ -101,7 +101,7 @@ func TestGroupDeviceMatchWhenLeaderNotFirst(t *testing.T) { }) // Leader deliberately placed last. pods := []corev1.Pod{worker, worker, leader} - specs, err := JobspecsForGroup("qgrp", pods, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("qgrp", pods, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -132,7 +132,7 @@ func qpuPodWithRequires(name string, requires map[string]string) corev1.Pod { // constraints, nothing extra (over-constraining would break unconstrained runs). func TestNoRequireAnnotationsAddsNoConstraints(t *testing.T) { p := qpuPodWithRequires("q", nil) - specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -145,7 +145,7 @@ func TestNoRequireAnnotationsAddsNoConstraints(t *testing.T) { // Exactly one require- constraint. func TestSingleRequireConstraint(t *testing.T) { p := qpuPodWithRequires("q", map[string]string{"qrmi_type": "braket-gate"}) - specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -169,7 +169,7 @@ func TestMultipleRequireConstraintsAreDeduped(t *testing.T) { // a worker that happens to repeat one of the same require- annotations worker := qpuPodWithRequires("w0", map[string]string{"vendor": "amazon"}) specs, err := JobspecsForGroup("g", []corev1.Pod{leader, worker}, - map[string]bool{"qpu": true}) + map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -211,7 +211,7 @@ func TestRequireAnnotationConstrainsDevice(t *testing.T) { leader.Annotations[RequireAnnotationPrefix+"vendor"] = "amazon" specs, err := JobspecsForGroup("qgrp", []corev1.Pod{leader}, - map[string]bool{"qpu": true}) + map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -232,7 +232,7 @@ func TestDeviceProducesSecondMatch(t *testing.T) { FluxionResourcePrefix + "qpu": qty(1), }) known := map[string]bool{"qpu": true} - specs, err := JobspecsForGroup("qgrp", []corev1.Pod{p}, known) + specs, err := JobspecsForGroup("qgrp", []corev1.Pod{p}, known, nil) if err != nil { t.Fatal(err) } @@ -274,7 +274,7 @@ func TestDeviceProducesSecondMatch(t *testing.T) { // node), so there are two matches: compute (core=1, virtual=false) and device. func TestDeviceOnlyStillForcesCompute(t *testing.T) { p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qpu": qty(1)}) - specs, err := JobspecsForGroup("qonly", []corev1.Pod{p}, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("qonly", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -289,7 +289,7 @@ func TestDeviceOnlyStillForcesCompute(t *testing.T) { // Requesting a device type the graph does not model is a hard error. func TestUnknownDeviceErrors(t *testing.T) { p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "fpga": qty(1)}) - _, err := JobspecsForGroup("grp", []corev1.Pod{p}, map[string]bool{"qpu": true}) + _, err := JobspecsForGroup("grp", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err == nil { t.Fatal("expected an error for an unmodeled device type") } @@ -301,7 +301,7 @@ func TestHoldDurationZero(t *testing.T) { corev1.ResourceCPU: qty(1), FluxionResourcePrefix + "qpu": qty(1), }) - specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}) + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}, nil) if err != nil { t.Fatal(err) } @@ -366,3 +366,76 @@ func TestPlacementUnmarkedNodeIsCompute(t *testing.T) { t.Fatalf("unmarked node should not be a backend, got %q", p.Backend) } } + +// When excludeNodes is non-empty, the compute jobspec's constraint must AND the +// base properties with an RFC 31 negated hostlist, so a re-match avoids the +// rejected nodes. When empty, the constraint must be the plain properties form +// (byte-for-byte the pre-exclusion behavior). +func TestExcludeNodesAddsNegatedHostlist(t *testing.T) { + p := podWith("p", corev1.ResourceList{corev1.ResourceCPU: qty(1)}) + + // no exclusion -> plain properties, no `and`/`not` + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, nil, nil) + if err != nil { + t.Fatal(err) + } + cons := computeConstraints(t, specs[0]) + if _, hasAnd := cons["and"]; hasAnd { + t.Fatalf("no-exclusion constraint must not use `and`: %#v", cons) + } + if _, hasProps := cons["properties"]; !hasProps { + t.Fatalf("no-exclusion constraint must have plain properties: %#v", cons) + } + + // with exclusion -> and[ properties, not[ hostlist ] ] + specs, err = JobspecsForGroup("g", []corev1.Pod{p}, nil, []string{"node-b", "node-c"}) + if err != nil { + t.Fatal(err) + } + cons = computeConstraints(t, specs[0]) + andTerms, ok := cons["and"].([]interface{}) + if !ok || len(andTerms) != 2 { + t.Fatalf("exclusion constraint must be `and` of 2 terms: %#v", cons) + } + // find the not/hostlist term + foundHostlist := false + for _, term := range andTerms { + tm, _ := term.(map[string]interface{}) + notTerm, ok := tm["not"].([]interface{}) + if !ok || len(notTerm) == 0 { + continue + } + inner, _ := notTerm[0].(map[string]interface{}) + hl, ok := inner["hostlist"].([]string) + if !ok { + // json round-trip may make it []interface{}; accept both + if hlAny, ok2 := inner["hostlist"].([]interface{}); ok2 { + if len(hlAny) == 2 { + foundHostlist = true + } + } + continue + } + if len(hl) == 2 { + foundHostlist = true + } + } + if !foundHostlist { + t.Fatalf("exclusion constraint must contain not[hostlist[2 nodes]]: %#v", cons) + } +} + +// computeConstraints digs out attributes.system.constraints from the compute +// jobspec (the first spec; device specs do not carry node exclusions). +func computeConstraints(t *testing.T, spec *jobspec.Jobspec) map[string]interface{} { + t.Helper() + sys, ok := spec.Attributes["system"].(map[string]interface{}) + if !ok { + t.Fatalf("no system attributes: %#v", spec.Attributes) + } + cons, ok := sys["constraints"].(map[string]interface{}) + if !ok { + t.Fatalf("no constraints: %#v", sys) + } + return cons +} diff --git a/test/e2e/01-classical-gang.sh b/test/e2e/01-classical-gang.sh index d2018ac..ffe8fa8 100644 --- a/test/e2e/01-classical-gang.sh +++ b/test/e2e/01-classical-gang.sh @@ -27,3 +27,7 @@ count="$(kubectl get pods -l app=training --no-headers | wc -l | tr -d ' ')" log "PASS: classical gang placed all $count pods via fluence" kubectl delete -f examples/single-podgroup.yaml --wait=false || true kubectl patch podgroup training --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +# Wait for the pods to actually be gone before the next test runs — otherwise a +# terminating 'training' pod (same name/labels reused by other scenarios) can be +# misread as the next test's placement. +kubectl wait --for=delete pod -l app=training --timeout=60s 2>/dev/null || true diff --git a/test/e2e/05-postfilter-rematch.sh b/test/e2e/05-postfilter-rematch.sh new file mode 100644 index 0000000..1712ea9 --- /dev/null +++ b/test/e2e/05-postfilter-rematch.sh @@ -0,0 +1,112 @@ +#!/usr/bin/env bash +# PostFilter re-match: when another scheduler plugin (TaintToleration) rejects a +# node Fluxion allocated, Fluence must abandon that allocation, exclude the node, +# and re-match onto an untainted node. Safety: the gang's RUNNING pod must NEVER +# bind to the tainted node. +# +# This test is self-isolating: it uses its own workload name (pf-rematch) and +# labels, distinct from the other e2e scenarios, and ensures a clean slate first, +# so a pod left over (terminating) from a previous test can never be mistaken for +# this test's placement. It also ignores terminating pods when asserting. +set -euo pipefail +HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE}/lib.sh" + +NAME=pf-rematch +SEL="app=${NAME}" + +log "TEST 5: PostFilter abandons a taint-rejected allocation and re-matches" + +# --- clean slate: no leftover pods from earlier tests under our name ---------- +kubectl delete deployment "$NAME" --ignore-not-found >/dev/null 2>&1 || true +kubectl delete podgroup "$NAME" --ignore-not-found >/dev/null 2>&1 || true +kubectl patch podgroup "$NAME" --type=merge \ + -p '{"metadata":{"finalizers":null}}' >/dev/null 2>&1 || true +kubectl wait --for=delete pod -l "$SEL" --timeout=60s >/dev/null 2>&1 || true + +TAINTED="$(kubectl get nodes -l '!node-role.kubernetes.io/control-plane' \ + -o jsonpath='{.items[0].metadata.name}')" +[ -n "$TAINTED" ] || fail "no worker node found to taint" +log "tainting node $TAINTED with fluence-e2e=blocked:NoSchedule" +kubectl taint nodes "$TAINTED" fluence-e2e=blocked:NoSchedule --overwrite + +cleanup() { + kubectl taint nodes "$TAINTED" fluence-e2e- 2>/dev/null || true + kubectl delete deployment "$NAME" --ignore-not-found --wait=false 2>/dev/null || true + kubectl delete podgroup "$NAME" --ignore-not-found --wait=false 2>/dev/null || true + kubectl patch podgroup "$NAME" --type=merge \ + -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +} +trap cleanup EXIT + +# --- our own workload (distinct name/labels; does NOT tolerate the taint) ------ +kubectl apply -f - <" for empty fields, so an empty deletionTimestamp + # shows as "", NOT "". Treat "" as empty for both columns. + if [ "$deleted" != "" ] && [ -n "$deleted" ]; then continue; fi # skip terminating + if [ "$node" = "" ] || [ -z "$node" ]; then continue; fi # skip not-yet-bound + checked=$((checked+1)) + if [ "$node" = "$TAINTED" ]; then + fail "SAFETY VIOLATION: running pod $name is bound to the tainted node $TAINTED" + fi + log "$name correctly placed on $node (not the tainted $TAINTED)" +done < <(kubectl get pods -l "$SEL" \ + -o custom-columns='N:.metadata.name,NODE:.spec.nodeName,DEL:.metadata.deletionTimestamp' \ + --no-headers) + +[ "$checked" -ge 1 ] || fail "no running ${NAME} pod found to check" + +# Informational: did PostFilter actually fire (Fluxion picked the tainted node +# first and we re-matched), or did Fluxion place on the good node directly? +POD="$(kubectl -n kube-system get pods -l app=fluence \ + -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || true)" +if [ -n "$POD" ] && kubectl -n kube-system logs "$POD" 2>/dev/null \ + | grep -q "unschedulable: abandoning allocation"; then + log "observed PostFilter abandonment in scheduler log (re-match path exercised)" +else + log "note: Fluxion placed on the untainted node directly this run (PostFilter not needed)" +fi + +log "PASS: gang scheduled on an untainted node; no running pod on the tainted node" From dd72a7c219a75f9128fe8b2183fa219a563af37f Mon Sep 17 00:00:00 2001 From: vsoch Date: Fri, 26 Jun 2026 14:59:37 -0700 Subject: [PATCH 2/2] gang: multicount tests Signed-off-by: vsoch --- .github/workflows/e2e-tests.yaml | 3 + examples/multi-gang-contention.yaml | 27 +++++ pkg/webhook/handler.go | 2 +- pkg/webhook/handlers/gang.go | 53 +++++++++- pkg/webhook/handlers/gang_test.go | 154 ++++++++++++++++++++++++++++ pkg/webhook/webhook.go | 20 +++- test/e2e/06-multi-gang.sh | 67 ++++++++++++ 7 files changed, 320 insertions(+), 6 deletions(-) create mode 100644 examples/multi-gang-contention.yaml create mode 100644 pkg/webhook/handlers/gang_test.go create mode 100644 test/e2e/06-multi-gang.sh diff --git a/.github/workflows/e2e-tests.yaml b/.github/workflows/e2e-tests.yaml index 6ab7613..9c83d9b 100644 --- a/.github/workflows/e2e-tests.yaml +++ b/.github/workflows/e2e-tests.yaml @@ -90,6 +90,9 @@ jobs: - name: E2E - classical gang run: bash test/e2e/01-classical-gang.sh + - name: E2E - multi-pod gang (all-or-nothing + contention) + run: bash test/e2e/06-multi-gang.sh + - name: E2E - PostFilter re-match (taint-rejected allocation) run: bash test/e2e/05-postfilter-rematch.sh diff --git a/examples/multi-gang-contention.yaml b/examples/multi-gang-contention.yaml new file mode 100644 index 0000000..6aa439a --- /dev/null +++ b/examples/multi-gang-contention.yaml @@ -0,0 +1,27 @@ +# Multi-pod gang via the WEBHOOK path (the path the experiments use): pods carry +# the group LABEL + group-size annotation; the fluence webhook creates the +# PodGroup with minCount = group-size (3). All 3 must place or none. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gang3 +spec: + replicas: 3 + selector: + matchLabels: {app: gang3} + template: + metadata: + labels: + app: gang3 + fluence.flux-framework.org/group: gang3 + annotations: + fluence.flux-framework.org/group-size: "3" + spec: + schedulerName: fluence + containers: + - name: worker + image: busybox + command: ["sleep", "3600"] + resources: + requests: + cpu: "1" diff --git a/pkg/webhook/handler.go b/pkg/webhook/handler.go index 82a1227..d3ca5eb 100644 --- a/pkg/webhook/handler.go +++ b/pkg/webhook/handler.go @@ -28,7 +28,7 @@ type MutatorAPI interface { // PodGroup operations (gang scheduling). Group identity is the value of the // group label, which the core treats as an opaque string. PodGroupLeader(ctx context.Context, namespace, group string) string - EnsurePodGroup(ctx context.Context, namespace, group, leaderPod string) + EnsurePodGroup(ctx context.Context, namespace, group, leaderPod string, minCount int32) RecordLeader(ctx context.Context, namespace, group, leaderPod string) // EnsureSidecarRBAC provisions the per-namespace ServiceAccount/Role/Binding diff --git a/pkg/webhook/handlers/gang.go b/pkg/webhook/handlers/gang.go index a6c6126..015f51c 100644 --- a/pkg/webhook/handlers/gang.go +++ b/pkg/webhook/handlers/gang.go @@ -2,11 +2,14 @@ package handlers import ( "context" + "log" + "strconv" "github.com/converged-computing/fluence/pkg/webhook" "github.com/converged-computing/fluence/pkg/webhook/spec" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func init() { @@ -31,12 +34,60 @@ func (h *gangHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod *cor // First pod admitted in the group creates the PodGroup and is recorded as // the admission-order leader. All pods are linked to the group. if m.PodGroupLeader(ctx, pod.Namespace, g) == "" { - m.EnsurePodGroup(ctx, pod.Namespace, g, pod.Name) + // minCount = full gang size N (leader + workers) from the group-size + // annotation, so the whole group schedules atomically. NOT expected-workers + // (that is N-1, the workers the sidecar ungates). Absent -> 1. + m.EnsurePodGroup(ctx, pod.Namespace, g, pod.Name, resolveMinCount(ctx, m, pod)) m.RecordLeader(ctx, pod.Namespace, g, pod.Name) } return schedulingGroupOps(pod, g) } +// resolveMinCount determines the gang's atomic-schedule size N: +// 1. explicit group-size annotation -> honor it verbatim. This is the override +// for when minCount must differ from the parent's replica count (e.g. the +// quantum leader/worker split, where the gang's N is expressed directly). +// 2. otherwise derive from the OWNING object: a Flux Operator MiniCluster pod +// is owned by an indexed Job whose parallelism == completions == size == N. +// (The operator sets Parallelism = Completions = MiniCluster.Spec.Size.) +// 3. otherwise default to 1, logged — never silently size a multi-pod gang to 1. +// +// The leader/worker (quantum) split is orthogonal and unchanged: it is driven by +// RoleAnnotation / QuantumResource in the quantum handler. minCount is always the +// FULL gang N regardless of which pods get gated. +func resolveMinCount(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod) int32 { + // 1. explicit override + if pod.Annotations != nil { + if n := pod.Annotations[webhook.GroupSizeAnnotation]; n != "" { + if v, err := strconv.Atoi(n); err == nil && v > 0 { + return int32(v) + } + } + } + // 2. derive from the owning Job's parallelism + if c := m.Client(); c != nil { + for _, ref := range pod.OwnerReferences { + if ref.Kind != "Job" { + continue + } + job, err := c.BatchV1().Jobs(pod.Namespace).Get(ctx, ref.Name, metav1.GetOptions{}) + if err != nil { + break + } + if job.Spec.Parallelism != nil && *job.Spec.Parallelism > 0 { + return *job.Spec.Parallelism + } + if job.Spec.Completions != nil && *job.Spec.Completions > 0 { + return *job.Spec.Completions + } + } + } + // 3. no signal: a single-pod gang. Log so a missing size on a multi-pod + // workload is visible rather than a silent gang-of-1. + log.Printf("[fluence-webhook] group %s: no group-size annotation and no owning Job parallelism; defaulting minCount=1", webhook.GroupName(pod)) + return 1 +} + // schedulingGroupOps links a pod to its PodGroup via the native 1.36 field // spec.schedulingGroup.podGroupName. Idempotent if already linked. func schedulingGroupOps(pod *corev1.Pod, group string) []spec.Op { diff --git a/pkg/webhook/handlers/gang_test.go b/pkg/webhook/handlers/gang_test.go new file mode 100644 index 0000000..77f7f46 --- /dev/null +++ b/pkg/webhook/handlers/gang_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2024 Lawrence Livermore National Security, LLC + (c.f. AUTHORS, NOTICE.LLNS, COPYING) +SPDX-License-Identifier: Apache-2.0 +*/ + +// Tests for gang PodGroup minCount: the whole gang (full N) must schedule +// atomically. Regression guard for the bug where every PodGroup was created +// with minCount=1, so a multi-pod gang was "satisfied" by a single pod and the +// rest were stranded (partial placement). +package handlers + +import ( + "context" + "testing" + + "strconv" + + "github.com/converged-computing/fluence/pkg/webhook" + + corev1 "k8s.io/api/core/v1" + + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +// minCountOf runs the gang handler for the leader pod of a group and returns the +// minCount of the PodGroup the webhook created. +func minCountOf(t *testing.T, pod *corev1.Pod) int32 { + t.Helper() + m := &webhook.Mutator{Clientset: fake.NewSimpleClientset()} + m.Mutate(context.Background(), pod) + pg, err := m.Clientset.SchedulingV1alpha2(). + PodGroups(pod.Namespace).Get(context.Background(), webhook.GroupName(pod), metav1.GetOptions{}) + if err != nil { + t.Fatalf("PodGroup not created: %v", err) + } + if pg.Spec.SchedulingPolicy.Gang == nil { + t.Fatal("PodGroup has no gang scheduling policy") + } + return pg.Spec.SchedulingPolicy.Gang.MinCount +} + +// minCountWithClient runs the gang handler with a pre-seeded clientset (so the +// owning Job exists) and returns the created PodGroup's minCount. +func minCountWithClient(t *testing.T, pod *corev1.Pod, objs ...interface{}) int32 { + t.Helper() + cs := fake.NewSimpleClientset(toRuntime(objs)...) + m := &webhook.Mutator{Clientset: cs} + m.Mutate(context.Background(), pod) + pg, err := cs.SchedulingV1alpha2().PodGroups(pod.Namespace). + Get(context.Background(), webhook.GroupName(pod), metav1.GetOptions{}) + if err != nil { + t.Fatalf("PodGroup not created: %v", err) + } + return pg.Spec.SchedulingPolicy.Gang.MinCount +} + +func jobWithParallelism(ns, name string, n int32) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: ns}, + Spec: batchv1.JobSpec{Parallelism: &n, Completions: &n}, + } +} + +func ownedBy(pod *corev1.Pod, kind, name string) { + pod.OwnerReferences = append(pod.OwnerReferences, + metav1.OwnerReference{Kind: kind, Name: name}) +} + +// No annotation, but the pod is owned by an indexed Job with parallelism N +// (the Flux Operator MiniCluster case: Parallelism == Completions == size == N). +// minCount must come from the Job. +func TestGangMinCountDerivedFromOwningJob(t *testing.T) { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "mc-gang"} + ownedBy(pod, "Job", "mc-gang-job") + got := minCountWithClient(t, pod, jobWithParallelism("default", "mc-gang-job", 4)) + if got != 4 { + t.Errorf("owner-derived: minCount=%d, want 4 (from Job parallelism)", got) + } +} + +// The explicit annotation OVERRIDES the owning Job's parallelism (the override +// exists precisely because minCount may differ from the parent replica count). +func TestGangMinCountAnnotationOverridesOwner(t *testing.T) { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "ovr-gang"} + pod.Annotations = map[string]string{webhook.GroupSizeAnnotation: "2"} + ownedBy(pod, "Job", "ovr-gang-job") + got := minCountWithClient(t, pod, jobWithParallelism("default", "ovr-gang-job", 8)) + if got != 2 { + t.Errorf("annotation override: minCount=%d, want 2 (annotation wins over Job=8)", got) + } +} + +// A classical gang of size N must get minCount = N so the whole group schedules +// atomically (this is the core multi-gang fix). +func atoi32(s string) int32 { v, _ := strconv.Atoi(s); return int32(v) } + +func toRuntime(objs []interface{}) []runtime.Object { + out := make([]runtime.Object, 0, len(objs)) + for _, o := range objs { + if ro, ok := o.(runtime.Object); ok { + out = append(out, ro) + } + } + return out +} + +func TestGangMinCountEqualsGroupSize(t *testing.T) { + for _, n := range []string{"2", "4", "8"} { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "g-" + n} + pod.Annotations = map[string]string{webhook.GroupSizeAnnotation: n} + got := minCountOf(t, pod) + want := atoi32(n) + if got != want { + t.Errorf("group-size=%s: minCount=%d, want %d", n, got, want) + } + } +} + +// No group-size annotation -> minCount falls back to 1 (single-pod gang). +func TestGangMinCountDefaultsToOne(t *testing.T) { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "g-default"} + if got := minCountOf(t, pod); got != 1 { + t.Errorf("absent group-size: minCount=%d, want 1", got) + } +} + +// Quantum distinction: a gang of full size N=4 that ALSO carries +// expected-workers=3 (the N-1 workers the sidecar ungates) must still get +// minCount=4 (the whole gang), NOT 3. minCount comes from group-size, not +// expected-workers. +func TestGangMinCountHonorsFullNWithQuantumSplit(t *testing.T) { + pod := cpuPod("fluence") + pod.Namespace = "default" + pod.Labels = map[string]string{webhook.GroupLabel: "q-gang"} + pod.Annotations = map[string]string{ + webhook.GroupSizeAnnotation: "4", // full N (leader + workers) + webhook.ExpectedWorkersAnnotation: "3", // N-1 workers to ungate + } + if got := minCountOf(t, pod); got != 4 { + t.Errorf("quantum gang: minCount=%d, want 4 (full N, not N-1)", got) + } +} diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index 20a7288..45af6da 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -71,6 +71,13 @@ const ( // opaque string and ascribes no meaning to it beyond propagation. ExpectedWorkersAnnotation = "fluence.flux-framework.org/expected-workers" + // GroupSizeAnnotation is the FULL gang member count N (leader + workers), + // set by the workload on each pod. It drives the PodGroup gang minCount so the + // whole group schedules atomically. This is distinct from + // ExpectedWorkersAnnotation (N-1: the workers the sidecar ungates; the leader + // is not gated). For a classical gang with no leader/worker split, N = size. + GroupSizeAnnotation = "fluence.flux-framework.org/group-size" + // Sidecar/staging infrastructure (generic — not quantum-specific). SidecarImage = "ghcr.io/converged-computing/fluence-sidecar:latest" SidecarServiceAccount = "fluence-sidecar" @@ -159,8 +166,13 @@ func (m *Mutator) PodGroupLeader(ctx context.Context, namespace, group string) s return "" } -// EnsurePodGroup creates a Fluence-owned PodGroup (minCount:1) if absent. -func (m *Mutator) EnsurePodGroup(ctx context.Context, namespace, group, leaderPod string) { +// EnsurePodGroup creates a Fluence-owned PodGroup with gang minCount = the full +// gang size N (the whole group schedules atomically) if absent. minCount<=0 +// falls back to 1. +func (m *Mutator) EnsurePodGroup(ctx context.Context, namespace, group, leaderPod string, minCount int32) { + if minCount <= 0 { + minCount = 1 + } if m.Clientset == nil { return } @@ -179,14 +191,14 @@ func (m *Mutator) EnsurePodGroup(ctx context.Context, namespace, group, leaderPo }, Spec: schedulingv1alpha2.PodGroupSpec{ SchedulingPolicy: schedulingv1alpha2.PodGroupSchedulingPolicy{ - Gang: &schedulingv1alpha2.GangSchedulingPolicy{MinCount: 1}, + Gang: &schedulingv1alpha2.GangSchedulingPolicy{MinCount: minCount}, }, }, } if _, err := m.Clientset.SchedulingV1alpha2().PodGroups(namespace).Create(ctx, pg, metav1.CreateOptions{}); err != nil { log.Printf("[fluence-webhook] could not create PodGroup %s/%s: %v", namespace, group, err) } else { - log.Printf("[fluence-webhook] created PodGroup %s/%s (minCount=1)", namespace, group) + log.Printf("[fluence-webhook] created PodGroup %s/%s (minCount=%d)", namespace, group, minCount) } } diff --git a/test/e2e/06-multi-gang.sh b/test/e2e/06-multi-gang.sh new file mode 100644 index 0000000..dc538d1 --- /dev/null +++ b/test/e2e/06-multi-gang.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash +# Multi-pod gang scheduling on real nodes. Guards the two failures that the +# single-pod 01 test could NOT catch (and that shipped a minCount=1 bug): +# A) a 3-pod gang must place ALL 3 (minCount must equal the gang size, not 1) +# B) under contention, a gang that cannot fully fit stays ENTIRELY pending — +# never partially placed (no stranded pods holding nodes). +set -euo pipefail +HERE="$(cd "$(dirname "$0")" && pwd)"; . "${HERE}/lib.sh" + +# ---- A) all-or-nothing placement of a 3-pod gang ------------------------------- +log "TEST 6A: multi-pod gang (3) places all-or-nothing" +kubectl apply -f examples/multi-gang.yaml + +# the webhook must have created the PodGroup with minCount = 3 (the bug set it to 1) +log "checking PodGroup minCount == 3 (set by webhook from group-size)" +for i in $(seq 1 30); do + mc="$(kubectl get podgroup gang3 -o jsonpath='{.spec.schedulingPolicy.gang.minCount}' 2>/dev/null || true)" + [ -n "$mc" ] && break; sleep 2 +done +[ "$mc" = "3" ] || fail "PodGroup gang3 minCount=$mc, want 3 (minCount=1 bug -> partial gangs)" + +log "waiting for all 3 gang pods to be Ready" +wait_pods_ready "app=gang3" 3 180 || fail "gang3 did not place all 3 pods (gang scheduling failed)" + +count="$(kubectl get pods -l app=gang3 --field-selector=status.phase=Running --no-headers | wc -l | tr -d ' ')" +[ "$count" = "3" ] || fail "expected 3 Running gang3 pods, got $count (partial placement)" +for p in $(kubectl get pods -l app=gang3 -o name); do + pod="${p#pod/}" + sched="$(kubectl get pod "$pod" -o jsonpath='{.spec.schedulerName}')" + [ "$sched" = "fluence" ] || fail "$pod not scheduled by fluence (got: $sched)" +done +log "PASS 6A: 3-pod gang placed atomically by fluence (minCount=3)" + +kubectl delete -f examples/multi-gang.yaml --wait=false || true +kubectl patch podgroup gang3 --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +kubectl wait --for=delete pod -l app=gang3 --timeout=60s 2>/dev/null || true + +# ---- B) contention: the gang that can't fully fit stays ENTIRELY pending -------- +log "TEST 6B: contention — a gang that cannot fully fit must NOT partially place" +kubectl apply -f examples/multi-gang-contention.yaml + +# wait until the cluster settles: exactly one gang fully Running, the other fully Pending. +log "waiting for one gang to win placement" +winner=""; loser="" +for i in $(seq 1 90); do + ra="$(kubectl get pods -l app=gangA --field-selector=status.phase=Running --no-headers 2>/dev/null | wc -l | tr -d ' ')" + rb="$(kubectl get pods -l app=gangB --field-selector=status.phase=Running --no-headers 2>/dev/null | wc -l | tr -d ' ')" + if [ "$ra" = "2" ] && [ "$rb" = "0" ]; then winner=gangA; loser=gangB; break; fi + if [ "$rb" = "2" ] && [ "$ra" = "0" ]; then winner=gangB; loser=gangA; break; fi + sleep 2 +done +[ -n "$winner" ] || fail "neither gang reached a clean 2/0 placement (check for partial placement)" +log "winner=$winner (2 running), loser=$loser (expected 0 running)" + +# the loser must have ZERO pods scheduled to a node — the all-or-nothing guarantee. +# A single scheduled loser pod = partial placement = the bug. +scheduled_loser="$(kubectl get pods -l app=$loser -o jsonpath='{range .items[*]}{.spec.nodeName}{"\n"}{end}' | grep -c . || true)" +[ "$scheduled_loser" = "0" ] || fail "$loser has $scheduled_loser pod(s) on a node — PARTIAL placement (gang violated)" +log "PASS 6B: $loser stayed entirely pending — no partial placement under contention" + +kubectl delete -f examples/multi-gang-contention.yaml --wait=false || true +for g in gangA gangB; do + kubectl patch podgroup $g --type=merge -p '{"metadata":{"finalizers":null}}' 2>/dev/null || true +done +kubectl wait --for=delete pod -l app=gangA --timeout=60s 2>/dev/null || true +kubectl wait --for=delete pod -l app=gangB --timeout=60s 2>/dev/null || true +log "PASS: multi-gang all-or-nothing verified"