diff --git a/.github/workflows/run-sweep.yml b/.github/workflows/run-sweep.yml index a473de04b..812d05c23 100644 --- a/.github/workflows/run-sweep.yml +++ b/.github/workflows/run-sweep.yml @@ -10,6 +10,7 @@ concurrency: github.event.label.name != 'full-sweep-enabled' && github.event.label.name != 'non-canary-full-sweep-enabled' && github.event.label.name != 'full-sweep-fail-fast' && + github.event.label.name != 'priority' && github.run_id || 'active' }} @@ -43,7 +44,8 @@ jobs: github.event.label.name == 'sweep-enabled' || github.event.label.name == 'full-sweep-enabled' || github.event.label.name == 'non-canary-full-sweep-enabled' || - github.event.label.name == 'full-sweep-fail-fast' + github.event.label.name == 'full-sweep-fail-fast' || + github.event.label.name == 'priority' ) steps: - name: Checkout code @@ -74,7 +76,8 @@ jobs: github.event.label.name == 'sweep-enabled' || github.event.label.name == 'full-sweep-enabled' || github.event.label.name == 'non-canary-full-sweep-enabled' || - github.event.label.name == 'full-sweep-fail-fast' + github.event.label.name == 'full-sweep-fail-fast' || + github.event.label.name == 'priority' ) ) || ( @@ -149,6 +152,56 @@ jobs: --ref "${{ github.ref }}" \ --workflow-id "run-sweep.yml" + - name: Authorize priority label + if: >- + github.event_name == 'pull_request' && + contains(github.event.pull_request.labels.*.name, 'priority') + env: + GH_TOKEN: ${{ secrets.REPO_PAT || github.token }} + run: | + adder=$(gh api "repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/timeline" \ + --paginate --jq '[.[] | select(.event == "labeled" and .label.name == "priority")] | last | .actor.login') + if [ -z "$adder" ] || [ "$adder" = "null" ]; then + echo "::error::Could not determine who added the 'priority' label" + exit 1 + fi + state=$(gh api "orgs/${{ github.repository_owner }}/memberships/${adder}" --jq .state 2>/dev/null || echo "none") + if [ "$state" != "active" ]; then + gh api -X DELETE "repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/labels/priority" || true + echo "::error::'priority' may only be used by ${{ github.repository_owner }} org members; label (added by ${adder}) has been removed" + exit 1 + fi + echo "'priority' label authorized: added by ${adder} (${{ github.repository_owner }} org member)" + + - name: Preempt runners (priority) + if: >- + github.event_name == 'pull_request' && + contains(github.event.pull_request.labels.*.name, 'priority') + env: + GH_TOKEN: ${{ secrets.REPO_PAT || github.token }} + SEARCH_SPACE: ${{ steps.setup.outputs.search-space-config }} + run: | + TARGET_LABELS=$(jq -r '[.. | objects | .runner? // empty] | unique | join(",")' <<<"$SEARCH_SPACE") + if [ -z "$TARGET_LABELS" ]; then + echo "Search space names no runners; nothing to preempt." + echo "[]" > preempted_runs.json + exit 0 + fi + python3 "${GITHUB_WORKSPACE}/utils/preempt_runners.py" \ + --repo "${{ github.repository }}" \ + --self-run-id "${{ github.run_id }}" \ + --target-labels "$TARGET_LABELS" \ + --output preempted_runs.json + + - name: Upload preempted runs artifact + if: >- + github.event_name == 'pull_request' && + contains(github.event.pull_request.labels.*.name, 'priority') + uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1 + with: + name: preempted-runs + path: preempted_runs.json + canary-select: needs: setup if: >- @@ -748,7 +801,8 @@ jobs: github.event.label.name == 'sweep-enabled' || github.event.label.name == 'full-sweep-enabled' || github.event.label.name == 'non-canary-full-sweep-enabled' || - github.event.label.name == 'full-sweep-fail-fast' + github.event.label.name == 'full-sweep-fail-fast' || + github.event.label.name == 'priority' ) runs-on: ubuntu-latest permissions: @@ -771,3 +825,31 @@ jobs: issue_number: context.issue.number, body, }); + + restore-preempted: + needs: [setup, collect-results, collect-evals] + if: >- + always() && + github.event_name == 'pull_request' && + contains(github.event.pull_request.labels.*.name, 'priority') && + needs.setup.result == 'success' + runs-on: ubuntu-latest + steps: + - name: Download preempted runs artifact + uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1 + with: + name: preempted-runs + + - name: Re-run failed jobs of preempted runs + env: + GH_TOKEN: ${{ secrets.REPO_PAT || github.token }} + run: | + count=$(jq 'length' preempted_runs.json) + echo "Restoring ${count} preempted run(s)" + jq -r '.[].run_id' preempted_runs.json | while read -r id; do + if gh run rerun "$id" --failed --repo "${{ github.repository }}"; then + echo "Re-ran failed jobs of run ${id}" + else + echo "::warning::Could not re-run failed jobs of run ${id}; restore manually with: gh run rerun ${id} --failed --repo ${{ github.repository }}" + fi + done diff --git a/AGENTS.md b/AGENTS.md index d77c76a21..83982398b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -65,6 +65,7 @@ PRs do not run the sweep automatically - `run-sweep.yml` is gated on a label. Pi - `full-sweep-enabled` - runs the full intermediate concurrency sweep behind a sequential single-node canary gate. Use when intermediate points matter (e.g. a recipe change shifts the throughput/latency curve, not just its endpoints). - `non-canary-full-sweep-enabled` - runs the full intermediate concurrency sweep without the canary gate. Use when the canary is flaky or not representative of the affected configuration. - `full-sweep-fail-fast` - runs the full intermediate concurrency sweep without the canary gate, but with `strategy.fail-fast` enabled on every matrix: the first failure in a matrix cancels that matrix's remaining jobs. Fail-fast is matrix-scoped, so the other matrices (1k1k vs 8k1k vs agentic vs evals) keep running and self-terminate on their own first failure; their completed results remain valid. The failing job keeps its red *failure* conclusion and the run concludes failed. Use when a failure means the rest of that matrix is wasted GPU time (e.g. new image bring-up). Note one flaky job kills its matrix's in-flight results. +- `priority` - modifier label, combined with one of the sweep labels above (it does not start a sweep by itself). Restricted to SemiAnalysisAI org members: the workflow resolves who added it from the issue timeline and strips the label otherwise. Before the sweep starts, every other in-progress or queued run with jobs on the sweep's target runners is cancelled (preemption is run-granular - GitHub has no per-job cancel API - so victim runs also lose their jobs on other SKUs), and the `restore-preempted` job at the end re-runs each victim's failed jobs via `gh run rerun --failed`. Preempted run IDs are saved in the `preempted-runs` artifact; if the priority run itself dies before `restore-preempted`, restore manually from that artifact. Reserve for day-0-style launches. **The sweep does not trigger while the PR has merge conflicts.** Even with `sweep-enabled`, `full-sweep-enabled`, `non-canary-full-sweep-enabled`, or `full-sweep-fail-fast` applied, the `run-sweep.yml` workflow will not start until the PR cleanly merges into main — a stale claude/* or update-* branch with a `perf-changelog.yaml` conflict (the common case) will sit in NO_SWEEP / NO_SUCCESS until rebased. Resolution recipe is documented in `KLAUD_DEBUG.md §1.1`: `git merge origin/main`, then `git checkout origin/main -- perf-changelog.yaml`, then re-append the PR's own changelog entry at the tail. Don't 3-way merge `perf-changelog.yaml`; whitespace edits silently re-trigger the deletion check. diff --git a/utils/preempt_runners.py b/utils/preempt_runners.py new file mode 100644 index 000000000..88c58228f --- /dev/null +++ b/utils/preempt_runners.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +"""Preempt GitHub Actions runs occupying a set of self-hosted runners. + +Used by ``run-sweep.yml`` when a PR carries the ``priority`` label: every +other in-progress or queued run with at least one job targeting the same +runners as the priority sweep is cancelled so the priority run gets the +fleet immediately. The cancelled run IDs are written to a JSON file +(uploaded as the ``preempted-runs`` artifact); the ``restore-preempted`` +job at the end of the priority run re-runs their failed jobs with +``gh run rerun --failed``. + +GitHub has no per-job cancel API, so preemption is run-granular: a victim +run is cancelled entirely even if only one of its jobs sits on a target +runner. The end-of-run restore brings back every non-successful job of +each victim, including that collateral. + +Runner matching: physical nodes carry several labels (e.g. a ``b200`` +node may also serve ``b200-dsv4`` and ``b200-multinode`` jobs), so an +in-progress job is matched through the runner it occupies (its full label +set, from the runners API) rather than the label it requested. Queued +jobs have no runner yet and are matched if some target runner could serve +them (requested labels are a subset of the runner's labels). If the token +cannot list runners, matching falls back to requested-label intersection. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from typing import Any + +API_BASE = "https://api.github.com" +ACTIVE_JOB_STATUSES = {"queued", "in_progress", "waiting", "pending"} + + +def github_api( + repo: str, + path: str, + token: str, + params: dict[str, str] | None = None, + method: str = "GET", +) -> Any: + """Call the GitHub REST API and return decoded JSON (None for 204/empty).""" + query = f"?{urllib.parse.urlencode(params)}" if params else "" + request = urllib.request.Request( + f"{API_BASE}/repos/{repo}{path}{query}", + headers={ + "Accept": "application/vnd.github+json", + "Authorization": f"Bearer {token}", + "X-GitHub-Api-Version": "2022-11-28", + }, + method=method, + ) + try: + with urllib.request.urlopen(request, timeout=30) as response: + body = response.read().decode("utf-8") + return json.loads(body) if body else None + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + raise RuntimeError(f"GitHub API {method} {path} failed: HTTP {exc.code}: {body}") from exc + + +def paginated_github_api( + repo: str, + path: str, + token: str, + item_key: str, + params: dict[str, str] | None = None, +) -> list[dict[str, Any]]: + """Fetch all pages from a GitHub REST list endpoint.""" + out: list[dict[str, Any]] = [] + page = 1 + while True: + page_params = {"per_page": "100", "page": str(page)} + if params: + page_params.update(params) + data = github_api(repo, path, token, page_params) + items = data.get(item_key, []) if isinstance(data, dict) else data + if not isinstance(items, list): + raise RuntimeError(f"GitHub API {path} returned an unexpected shape") + out.extend(items) + if len(items) < 100: + return out + page += 1 + + +def runner_labels_by_name(runners: list[dict[str, Any]]) -> dict[str, set[str]]: + """Map runner name -> the full set of labels it serves.""" + return { + str(runner["name"]): {str(label["name"]) for label in runner.get("labels", [])} + for runner in runners + if runner.get("name") + } + + +def job_matches_targets( + job: dict[str, Any], + target_labels: set[str], + runner_index: dict[str, set[str]] | None, +) -> bool: + """Whether a job occupies (or could grab) a runner serving a target label.""" + if job.get("status") not in ACTIVE_JOB_STATUSES: + return False + requested = {str(label) for label in job.get("labels") or []} + runner_name = job.get("runner_name") + if runner_index is not None: + if runner_name: + return bool(runner_index.get(str(runner_name), set()) & target_labels) + # Queued job: preempt it if any runner serving a target label could + # also serve this job, i.e. it competes for the freed capacity. + return any( + requested and requested <= labels + for labels in runner_index.values() + if labels & target_labels + ) + return bool(requested & target_labels) + + +def select_runs_to_preempt( + runs_with_jobs: list[tuple[dict[str, Any], list[dict[str, Any]]]], + target_labels: set[str], + runner_index: dict[str, set[str]] | None, + self_run_id: int, +) -> list[dict[str, Any]]: + """Pick the runs whose active jobs collide with the target runners.""" + selected = [] + for run, jobs in runs_with_jobs: + if int(run.get("id", 0)) == self_run_id: + continue + matching = [job for job in jobs if job_matches_targets(job, target_labels, runner_index)] + if matching: + selected.append( + { + "run_id": run["id"], + "workflow_name": run.get("name"), + "head_branch": run.get("head_branch"), + "event": run.get("event"), + "html_url": run.get("html_url"), + "matching_jobs": [job.get("name") for job in matching], + } + ) + return selected + + +def wait_for_completion(repo: str, token: str, run_ids: list[int], timeout_s: int = 180) -> None: + """Block until the cancelled runs complete and release their runners.""" + deadline = time.time() + timeout_s + pending = set(run_ids) + while pending and time.time() < deadline: + for run_id in sorted(pending): + run = github_api(repo, f"/actions/runs/{run_id}", token) + if run.get("status") == "completed": + pending.discard(run_id) + if pending: + time.sleep(10) + if pending: + print(f"::warning::Runs still not completed after {timeout_s}s: {sorted(pending)}") + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--repo", required=True) + parser.add_argument("--self-run-id", required=True, type=int) + parser.add_argument( + "--target-labels", + required=True, + help="Comma-separated runner labels the priority sweep needs (e.g. 'h100,h100-multinode').", + ) + parser.add_argument("--output", required=True, help="Path to write the preempted-runs JSON.") + parser.add_argument("--dry-run", action="store_true", help="Select and report, but do not cancel.") + args = parser.parse_args() + + token = os.environ.get("GH_TOKEN") or os.environ.get("GITHUB_TOKEN") + if not token: + raise RuntimeError("GH_TOKEN or GITHUB_TOKEN is required") + + target_labels = {label.strip() for label in args.target_labels.split(",") if label.strip()} + if not target_labels: + raise RuntimeError("--target-labels resolved to an empty set") + print(f"Target runner labels: {sorted(target_labels)}") + + runner_index: dict[str, set[str]] | None + try: + runners = paginated_github_api(args.repo, "/actions/runners", token, "runners") + runner_index = runner_labels_by_name(runners) + print(f"Resolved {len(runner_index)} self-hosted runners for label matching") + except RuntimeError as exc: + runner_index = None + print(f"::warning::Cannot list runners ({exc}); falling back to requested-label matching") + + runs: list[dict[str, Any]] = [] + for status in ("in_progress", "queued"): + runs.extend( + paginated_github_api( + args.repo, "/actions/runs", token, "workflow_runs", {"status": status} + ) + ) + + runs_with_jobs = [ + ( + run, + paginated_github_api( + args.repo, f"/actions/runs/{run['id']}/jobs", token, "jobs", {"filter": "latest"} + ), + ) + for run in runs + ] + + selected = select_runs_to_preempt(runs_with_jobs, target_labels, runner_index, args.self_run_id) + for entry in selected: + print( + f"Preempting run {entry['run_id']} ({entry['workflow_name']}, {entry['head_branch']}, " + f"{entry['event']}): jobs {entry['matching_jobs']}" + ) + if not selected: + print("No runs occupy the target runners; nothing to preempt.") + + if selected and not args.dry_run: + for entry in selected: + try: + github_api(args.repo, f"/actions/runs/{entry['run_id']}/cancel", token, method="POST") + except RuntimeError as exc: + # Already finished/cancelling is fine; the goal is a free runner. + print(f"::warning::Cancel of run {entry['run_id']} returned: {exc}") + wait_for_completion(args.repo, token, [entry["run_id"] for entry in selected]) + + with open(args.output, "w") as handle: + json.dump(selected, handle, indent=2) + print(f"Wrote {len(selected)} preempted run(s) to {args.output}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/utils/test_preempt_runners.py b/utils/test_preempt_runners.py new file mode 100644 index 000000000..a73ec2250 --- /dev/null +++ b/utils/test_preempt_runners.py @@ -0,0 +1,83 @@ +"""Tests for the preemption selection logic in preempt_runners.py.""" + +from preempt_runners import job_matches_targets, runner_labels_by_name, select_runs_to_preempt + +RUNNER_INDEX = { + "b200-dgxc_06": {"self-hosted", "b200", "b200-dsv4", "b200-dgxc", "b200-multinode"}, + "b200-dgxc_07": {"self-hosted", "b200", "b200-dsv4", "b200-dgxc", "b200-disagg"}, + "h100-dgxc-slurm_06": {"self-hosted", "h100", "h100-dgxc"}, +} + + +def job(status: str, labels: list[str], runner_name: str | None = None, name: str = "j") -> dict: + return {"status": status, "labels": labels, "runner_name": runner_name, "name": name} + + +def run(run_id: int, name: str = "Run Sweep", branch: str = "some-branch") -> dict: + return { + "id": run_id, + "name": name, + "head_branch": branch, + "event": "pull_request", + "html_url": f"https://example.com/runs/{run_id}", + } + + +class TestJobMatchesTargets: + def test_in_progress_job_matched_via_runner_label_set(self): + # The job requested b200-multinode but occupies a node that also + # serves b200 — a b200 priority sweep must preempt it. + j = job("in_progress", ["b200-multinode"], runner_name="b200-dgxc_06") + assert job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_in_progress_job_on_unrelated_runner_not_matched(self): + j = job("in_progress", ["h100"], runner_name="h100-dgxc-slurm_06") + assert not job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_completed_job_never_matched(self): + j = job("completed", ["b200"], runner_name="b200-dgxc_06") + assert not job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_queued_job_matched_when_a_target_runner_could_serve_it(self): + # Queued b200-multinode job competes for the b200 nodes we free up. + j = job("queued", ["b200-multinode"]) + assert job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_queued_job_not_matched_when_no_target_runner_serves_it(self): + j = job("queued", ["h100"]) + assert not job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + def test_fallback_without_runner_index_uses_requested_labels(self): + assert job_matches_targets(job("in_progress", ["b200"]), {"b200"}, None) + assert not job_matches_targets(job("in_progress", ["b200-multinode"]), {"b200"}, None) + + def test_ubuntu_jobs_never_matched(self): + j = job("in_progress", ["ubuntu-latest"]) + assert not job_matches_targets(j, {"b200"}, RUNNER_INDEX) + + +class TestSelectRunsToPreempt: + def test_skips_self_run(self): + runs = [(run(1), [job("in_progress", ["b200"], "b200-dgxc_06")])] + assert select_runs_to_preempt(runs, {"b200"}, RUNNER_INDEX, self_run_id=1) == [] + + def test_selects_run_with_one_colliding_job(self): + runs = [ + (run(1), [job("in_progress", ["b200"], "b200-dgxc_06", name="bmk-b200")]), + (run(2), [job("in_progress", ["h100"], "h100-dgxc-slurm_06")]), + ] + selected = select_runs_to_preempt(runs, {"b200"}, RUNNER_INDEX, self_run_id=99) + assert [entry["run_id"] for entry in selected] == [1] + assert selected[0]["matching_jobs"] == ["bmk-b200"] + + def test_run_with_only_completed_jobs_not_selected(self): + runs = [(run(1), [job("completed", ["b200"], "b200-dgxc_06")])] + assert select_runs_to_preempt(runs, {"b200"}, RUNNER_INDEX, self_run_id=99) == [] + + +def test_runner_labels_by_name(): + runners = [ + {"name": "b200-dgxc_06", "labels": [{"name": "b200"}, {"name": "b200-multinode"}]}, + {"labels": [{"name": "ignored-no-name"}]}, + ] + assert runner_labels_by_name(runners) == {"b200-dgxc_06": {"b200", "b200-multinode"}}