From 6a11a53297f289b368ab5159887716aa98515f11 Mon Sep 17 00:00:00 2001 From: cquil11 Date: Fri, 12 Jun 2026 15:39:13 -0500 Subject: [PATCH] feat(ci): add priority label to preempt runners for high-priority sweeps When an org member adds 'priority' alongside a sweep label, setup cancels every other in-progress or queued run with jobs on the sweep's target runners (resolved through each runner's full label set, since SKU fleets share nodes across labels) and records them in the preempted-runs artifact. A restore-preempted job at the end re-runs each victim's failed jobs via gh run rerun --failed. Preemption is run-granular: GitHub has no per-job cancel API. --- .github/workflows/run-sweep.yml | 88 +++++++++++- AGENTS.md | 1 + utils/preempt_runners.py | 240 ++++++++++++++++++++++++++++++++ utils/test_preempt_runners.py | 83 +++++++++++ 4 files changed, 409 insertions(+), 3 deletions(-) create mode 100644 utils/preempt_runners.py create mode 100644 utils/test_preempt_runners.py diff --git a/.github/workflows/run-sweep.yml b/.github/workflows/run-sweep.yml index a473de04b..812d05c23 100644 --- a/.github/workflows/run-sweep.yml +++ b/.github/workflows/run-sweep.yml @@ -10,6 +10,7 @@ concurrency: github.event.label.name != 'full-sweep-enabled' && github.event.label.name != 'non-canary-full-sweep-enabled' && github.event.label.name != 'full-sweep-fail-fast' && + github.event.label.name != 'priority' && github.run_id || 'active' }} @@ -43,7 +44,8 @@ jobs: github.event.label.name == 'sweep-enabled' || github.event.label.name == 'full-sweep-enabled' || github.event.label.name == 'non-canary-full-sweep-enabled' || - github.event.label.name == 'full-sweep-fail-fast' + github.event.label.name == 'full-sweep-fail-fast' || + github.event.label.name == 'priority' ) steps: - name: Checkout code @@ -74,7 +76,8 @@ jobs: github.event.label.name == 'sweep-enabled' || github.event.label.name == 'full-sweep-enabled' || github.event.label.name == 'non-canary-full-sweep-enabled' || - github.event.label.name == 'full-sweep-fail-fast' + github.event.label.name == 'full-sweep-fail-fast' || + github.event.label.name == 'priority' ) ) || ( @@ -149,6 +152,56 @@ jobs: --ref "${{ github.ref }}" \ --workflow-id "run-sweep.yml" + - name: Authorize priority label + if: >- + github.event_name == 'pull_request' && + contains(github.event.pull_request.labels.*.name, 'priority') + env: + GH_TOKEN: ${{ secrets.REPO_PAT || github.token }} + run: | + adder=$(gh api "repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/timeline" \ + --paginate --jq '[.[] | select(.event == "labeled" and .label.name == "priority")] | last | .actor.login') + if [ -z "$adder" ] || [ "$adder" = "null" ]; then + echo "::error::Could not determine who added the 'priority' label" + exit 1 + fi + state=$(gh api "orgs/${{ github.repository_owner }}/memberships/${adder}" --jq .state 2>/dev/null || echo "none") + if [ "$state" != "active" ]; then + gh api -X DELETE "repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/labels/priority" || true + echo "::error::'priority' may only be used by ${{ github.repository_owner }} org members; label (added by ${adder}) has been removed" + exit 1 + fi + echo "'priority' label authorized: added by ${adder} (${{ github.repository_owner }} org member)" + + - name: Preempt runners (priority) + if: >- + github.event_name == 'pull_request' && + contains(github.event.pull_request.labels.*.name, 'priority') + env: + GH_TOKEN: ${{ secrets.REPO_PAT || github.token }} + SEARCH_SPACE: ${{ steps.setup.outputs.search-space-config }} + run: | + TARGET_LABELS=$(jq -r '[.. | objects | .runner? // empty] | unique | join(",")' <<<"$SEARCH_SPACE") + if [ -z "$TARGET_LABELS" ]; then + echo "Search space names no runners; nothing to preempt." + echo "[]" > preempted_runs.json + exit 0 + fi + python3 "${GITHUB_WORKSPACE}/utils/preempt_runners.py" \ + --repo "${{ github.repository }}" \ + --self-run-id "${{ github.run_id }}" \ + --target-labels "$TARGET_LABELS" \ + --output preempted_runs.json + + - name: Upload preempted runs artifact + if: >- + github.event_name == 'pull_request' && + contains(github.event.pull_request.labels.*.name, 'priority') + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: preempted-runs + path: preempted_runs.json + canary-select: needs: setup if: >- @@ -748,7 +801,8 @@ jobs: github.event.label.name == 'sweep-enabled' || github.event.label.name == 'full-sweep-enabled' || github.event.label.name == 'non-canary-full-sweep-enabled' || - github.event.label.name == 'full-sweep-fail-fast' + github.event.label.name == 'full-sweep-fail-fast' || + github.event.label.name == 'priority' ) runs-on: ubuntu-latest permissions: @@ -771,3 +825,31 @@ jobs: issue_number: context.issue.number, body, }); + + restore-preempted: + needs: [setup, collect-results, collect-evals] + if: >- + always() && + github.event_name == 'pull_request' && + contains(github.event.pull_request.labels.*.name, 'priority') && + needs.setup.result == 'success' + runs-on: ubuntu-latest + steps: + - name: Download preempted runs artifact + uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1 + with: + name: preempted-runs + + - name: Re-run failed jobs of preempted runs + env: + GH_TOKEN: ${{ secrets.REPO_PAT || github.token }} + run: | + count=$(jq 'length' preempted_runs.json) + echo "Restoring ${count} preempted run(s)" + jq -r '.[].run_id' preempted_runs.json | while read -r id; do + if gh run rerun "$id" --failed --repo "${{ github.repository }}"; then + echo "Re-ran failed jobs of run ${id}" + else + echo "::warning::Could not re-run failed jobs of run ${id}; restore manually with: gh run rerun ${id} --failed --repo ${{ github.repository }}" + fi + done diff --git a/AGENTS.md b/AGENTS.md index d77c76a21..83982398b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -65,6 +65,7 @@ PRs do not run the sweep automatically - `run-sweep.yml` is gated on a label. Pi - `full-sweep-enabled` - runs the full intermediate concurrency sweep behind a sequential single-node canary gate. Use when intermediate points matter (e.g. a recipe change shifts the throughput/latency curve, not just its endpoints). - `non-canary-full-sweep-enabled` - runs the full intermediate concurrency sweep without the canary gate. Use when the canary is flaky or not representative of the affected configuration. - `full-sweep-fail-fast` - runs the full intermediate concurrency sweep without the canary gate, but with `strategy.fail-fast` enabled on every matrix: the first failure in a matrix cancels that matrix's remaining jobs. Fail-fast is matrix-scoped, so the other matrices (1k1k vs 8k1k vs agentic vs evals) keep running and self-terminate on their own first failure; their completed results remain valid. The failing job keeps its red *failure* conclusion and the run concludes failed. Use when a failure means the rest of that matrix is wasted GPU time (e.g. new image bring-up). Note one flaky job kills its matrix's in-flight results. +- `priority` - modifier label, combined with one of the sweep labels above (it does not start a sweep by itself). Restricted to SemiAnalysisAI org members: the workflow resolves who added it from the issue timeline and strips the label otherwise. Before the sweep starts, every other in-progress or queued run with jobs on the sweep's target runners is cancelled (preemption is run-granular - GitHub has no per-job cancel API - so victim runs also lose their jobs on other SKUs), and the `restore-preempted` job at the end re-runs each victim's failed jobs via `gh run rerun --failed`. Preempted run IDs are saved in the `preempted-runs` artifact; if the priority run itself dies before `restore-preempted`, restore manually from that artifact. Reserve for day-0-style launches. **The sweep does not trigger while the PR has merge conflicts.** Even with `sweep-enabled`, `full-sweep-enabled`, `non-canary-full-sweep-enabled`, or `full-sweep-fail-fast` applied, the `run-sweep.yml` workflow will not start until the PR cleanly merges into main — a stale claude/* or update-* branch with a `perf-changelog.yaml` conflict (the common case) will sit in NO_SWEEP / NO_SUCCESS until rebased. Resolution recipe is documented in `KLAUD_DEBUG.md §1.1`: `git merge origin/main`, then `git checkout origin/main -- perf-changelog.yaml`, then re-append the PR's own changelog entry at the tail. Don't 3-way merge `perf-changelog.yaml`; whitespace edits silently re-trigger the deletion check. diff --git a/utils/preempt_runners.py b/utils/preempt_runners.py new file mode 100644 index 000000000..88c58228f --- /dev/null +++ b/utils/preempt_runners.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +"""Preempt GitHub Actions runs occupying a set of self-hosted runners. + +Used by ``run-sweep.yml`` when a PR carries the ``priority`` label: every +other in-progress or queued run with at least one job targeting the same +runners as the priority sweep is cancelled so the priority run gets the +fleet immediately. The cancelled run IDs are written to a JSON file +(uploaded as the ``preempted-runs`` artifact); the ``restore-preempted`` +job at the end of the priority run re-runs their failed jobs with +``gh run rerun --failed``. + +GitHub has no per-job cancel API, so preemption is run-granular: a victim +run is cancelled entirely even if only one of its jobs sits on a target +runner. The end-of-run restore brings back every non-successful job of +each victim, including that collateral. + +Runner matching: physical nodes carry several labels (e.g. a ``b200`` +node may also serve ``b200-dsv4`` and ``b200-multinode`` jobs), so an +in-progress job is matched through the runner it occupies (its full label +set, from the runners API) rather than the label it requested. Queued +jobs have no runner yet and are matched if some target runner could serve +them (requested labels are a subset of the runner's labels). If the token +cannot list runners, matching falls back to requested-label intersection. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from typing import Any + +API_BASE = "https://api.github.com" +ACTIVE_JOB_STATUSES = {"queued", "in_progress", "waiting", "pending"} + + +def github_api( + repo: str, + path: str, + token: str, + params: dict[str, str] | None = None, + method: str = "GET", +) -> Any: + """Call the GitHub REST API and return decoded JSON (None for 204/empty).""" + query = f"?{urllib.parse.urlencode(params)}" if params else "" + request = urllib.request.Request( + f"{API_BASE}/repos/{repo}{path}{query}", + headers={ + "Accept": "application/vnd.github+json", + "Authorization": f"Bearer {token}", + "X-GitHub-Api-Version": "2022-11-28", + }, + method=method, + ) + try: + with urllib.request.urlopen(request, timeout=30) as response: + body = response.read().decode("utf-8") + return json.loads(body) if body else None + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + raise RuntimeError(f"GitHub API {method} {path} failed: HTTP {exc.code}: {body}") from exc + + +def paginated_github_api( + repo: str, + path: str, + token: str, + item_key: str, + params: dict[str, str] | None = None, +) -> list[dict[str, Any]]: + """Fetch all pages from a GitHub REST list endpoint.""" + out: list[dict[str, Any]] = [] + page = 1 + while True: + page_params = {"per_page": "100", "page": str(page)} + if params: + page_params.update(params) + data = github_api(repo, path, token, page_params) + items = data.get(item_key, []) if isinstance(data, dict) else data + if not isinstance(items, list): + raise RuntimeError(f"GitHub API {path} returned an unexpected shape") + out.extend(items) + if len(items) < 100: + return out + page += 1 + + +def runner_labels_by_name(runners: list[dict[str, Any]]) -> dict[str, set[str]]: + """Map runner name -> the full set of labels it serves.""" + return { + str(runner["name"]): {str(label["name"]) for label in runner.get("labels", [])} + for runner in runners + if runner.get("name") + } + + +def job_matches_targets( + job: dict[str, Any], + target_labels: set[str], + runner_index: dict[str, set[str]] | None, +) -> bool: + """Whether a job occupies (or could grab) a runner serving a target label.""" + if job.get("status") not in ACTIVE_JOB_STATUSES: + return False + requested = {str(label) for label in job.get("labels") or []} + runner_name = job.get("runner_name") + if runner_index is not None: + if runner_name: + return bool(runner_index.get(str(runner_name), set()) & target_labels) + # Queued job: preempt it if any runner serving a target label could + # also serve this job, i.e. it competes for the freed capacity. + return any( + requested and requested <= labels + for labels in runner_index.values() + if labels & target_labels + ) + return bool(requested & target_labels) + + +def select_runs_to_preempt( + runs_with_jobs: list[tuple[dict[str, Any], list[dict[str, Any]]]], + target_labels: set[str], + runner_index: dict[str, set[str]] | None, + self_run_id: int, +) -> list[dict[str, Any]]: + """Pick the runs whose active jobs collide with the target runners.""" + selected = [] + for run, jobs in runs_with_jobs: + if int(run.get("id", 0)) == self_run_id: + continue + matching = [job for job in jobs if job_matches_targets(job, target_labels, runner_index)] + if matching: + selected.append( + { + "run_id": run["id"], + "workflow_name": run.get("name"), + "head_branch": run.get("head_branch"), + "event": run.get("event"), + "html_url": run.get("html_url"), + "matching_jobs": [job.get("name") for job in matching], + } + ) + return selected + + +def wait_for_completion(repo: str, token: str, run_ids: list[int], timeout_s: int = 180) -> None: + """Block until the cancelled runs complete and release their runners.""" + deadline = time.time() + timeout_s + pending = set(run_ids) + while pending and time.time() < deadline: + for run_id in sorted(pending): + run = github_api(repo, f"/actions/runs/{run_id}", token) + if run.get("status") == "completed": + pending.discard(run_id) + if pending: + time.sleep(10) + if pending: + print(f"::warning::Runs still not completed after {timeout_s}s: {sorted(pending)}") + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--repo", required=True) + parser.add_argument("--self-run-id", required=True, type=int) + parser.add_argument( + "--target-labels", + required=True, + help="Comma-separated runner labels the priority sweep needs (e.g. 'h100,h100-multinode').", + ) + parser.add_argument("--output", required=True, help="Path to write the preempted-runs JSON.") + parser.add_argument("--dry-run", action="store_true", help="Select and report, but do not cancel.") + args = parser.parse_args() + + token = os.environ.get("GH_TOKEN") or os.environ.get("GITHUB_TOKEN") + if not token: + raise RuntimeError("GH_TOKEN or GITHUB_TOKEN is required") + + target_labels = {label.strip() for label in args.target_labels.split(",") if label.strip()} + if not target_labels: + raise RuntimeError("--target-labels resolved to an empty set") + print(f"Target runner labels: {sorted(target_labels)}") + + runner_index: dict[str, set[str]] | None + try: + runners = paginated_github_api(args.repo, "/actions/runners", token, "runners") + runner_index = runner_labels_by_name(runners) + print(f"Resolved {len(runner_index)} self-hosted runners for label matching") + except RuntimeError as exc: + runner_index = None + print(f"::warning::Cannot list runners ({exc}); falling back to requested-label matching") + + runs: list[dict[str, Any]] = [] + for status in ("in_progress", "queued"): + runs.extend( + paginated_github_api( + args.repo, "/actions/runs", token, "workflow_runs", {"status": status} + ) + ) + + runs_with_jobs = [ + ( + run, + paginated_github_api( + args.repo, f"/actions/runs/{run['id']}/jobs", token, "jobs", {"filter": "latest"} + ), + ) + for run in runs + ] + + selected = select_runs_to_preempt(runs_with_jobs, target_labels, runner_index, args.self_run_id) + for entry in selected: + print( + f"Preempting run {entry['run_id']} ({entry['workflow_name']}, {entry['head_branch']}, " + f"{entry['event']}): jobs {entry['matching_jobs']}" + ) + if not selected: + print("No runs occupy the target runners; nothing to preempt.") + + if selected and not args.dry_run: + for entry in selected: + try: + github_api(args.repo, f"/actions/runs/{entry['run_id']}/cancel", token, method="POST") + except RuntimeError as exc: + # Already finished/cancelling is fine; the goal is a free runner. + print(f"::warning::Cancel of run {entry['run_id']} returned: {exc}") + wait_for_completion(args.repo, token, [entry["run_id"] for entry in selected]) + + with open(args.output, "w") as handle: + json.dump(selected, handle, indent=2) + print(f"Wrote {len(selected)} preempted run(s) to {args.output}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/utils/test_preempt_runners.py b/utils/test_preempt_runners.py new file mode 100644 index 000000000..a73ec2250 --- /dev/null +++ b/utils/test_preempt_runners.py @@ -0,0 +1,83 @@ +"""Tests for the preemption selection logic in preempt_runners.py.""" + +from preempt_runners import job_matches_targets, runner_labels_by_name, select_runs_to_preempt + +RUNNER_INDEX = { + "b200-dgxc_06": {"self-hosted", "b200", "b200-dsv4", "b200-dgxc", "b200-multinode"}, + "b200-dgxc_07": {"self-hosted", "b200", "b200-dsv4", "b200-dgxc", "b200-disagg"}, + "h100-dgxc-slurm_06": {"self-hosted", "h100", "h100-dgxc"}, +} + + +def job(status: str, labels: list[str], runner_name: str | None = None, name: str = "j") -> dict: + return {"status": status, "labels": labels, "runner_name": runner_name, "name": name} + + +def run(run_id: int, name: str = "Run Sweep", branch: str = "some-branch") -> dict: + return { + "id": run_id, + "name": name, + "head_branch": branch, + "event": "pull_request", + "html_url": f"https://example.com/runs/{run_id}", + } + + +class TestJobMatchesTargets: + def test_in_progress_job_matched_via_runner_label_set(self): + # The job requested b200-multinode but occupies a node that also + # serves b200 — a b200 priority sweep must preempt it. + j = job("in_progress", ["b200-multinode"], runner_name="b200-dgxc_06") + assert job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_in_progress_job_on_unrelated_runner_not_matched(self): + j = job("in_progress", ["h100"], runner_name="h100-dgxc-slurm_06") + assert not job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_completed_job_never_matched(self): + j = job("completed", ["b200"], runner_name="b200-dgxc_06") + assert not job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_queued_job_matched_when_a_target_runner_could_serve_it(self): + # Queued b200-multinode job competes for the b200 nodes we free up. + j = job("queued", ["b200-multinode"]) + assert job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_queued_job_not_matched_when_no_target_runner_serves_it(self): + j = job("queued", ["h100"]) + assert not job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_fallback_without_runner_index_uses_requested_labels(self): + assert job_matches_targets(job("in_progress", ["b200"]), {"b200"}, None) + assert not job_matches_targets(job("in_progress", ["b200-multinode"]), {"b200"}, None) + + def test_ubuntu_jobs_never_matched(self): + j = job("in_progress", ["ubuntu-latest"]) + assert not job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + +class TestSelectRunsToPreempt: + def test_skips_self_run(self): + runs = [(run(1), [job("in_progress", ["b200"], "b200-dgxc_06")])] + assert select_runs_to_preempt(runs, {"b200"}, RUNNER_INDEX, self_run_id=1) == [] + + def test_selects_run_with_one_colliding_job(self): + runs = [ + (run(1), [job("in_progress", ["b200"], "b200-dgxc_06", name="bmk-b200")]), + (run(2), [job("in_progress", ["h100"], "h100-dgxc-slurm_06")]), + ] + selected = select_runs_to_preempt(runs, {"b200"}, RUNNER_INDEX, self_run_id=99) + assert [entry["run_id"] for entry in selected] == [1] + assert selected[0]["matching_jobs"] == ["bmk-b200"] + + def test_run_with_only_completed_jobs_not_selected(self): + runs = [(run(1), [job("completed", ["b200"], "b200-dgxc_06")])] + assert select_runs_to_preempt(runs, {"b200"}, RUNNER_INDEX, self_run_id=99) == [] + + +def test_runner_labels_by_name(): + runners = [ + {"name": "b200-dgxc_06", "labels": [{"name": "b200"}, {"name": "b200-multinode"}]}, + {"labels": [{"name": "ignored-no-name"}]}, + ] + assert runner_labels_by_name(runners) == {"b200-dgxc_06": {"b200", "b200-multinode"}}