From bdbadf43c23531d7525f14fead08bda6a1ad6943 Mon Sep 17 00:00:00 2001 From: Mark Saroufim Date: Sat, 4 Jul 2026 19:29:49 -0700 Subject: [PATCH] Report failed secret ranked submissions --- .../background_submission_manager.py | 88 +++++++++++++++++-- tests/test_background_submission_manager.py | 52 +++++++++++ 2 files changed, 135 insertions(+), 5 deletions(-) diff --git a/src/libkernelbot/background_submission_manager.py b/src/libkernelbot/background_submission_manager.py index c3d3d8b5..ed3a9cd2 100644 --- a/src/libkernelbot/background_submission_manager.py +++ b/src/libkernelbot/background_submission_manager.py @@ -2,12 +2,14 @@ import contextlib import datetime as dt import logging +from collections.abc import Sequence from dataclasses import dataclass from libkernelbot.backend import KernelBackend from libkernelbot.consts import SubmissionMode from libkernelbot.kernelguard import KernelGuardRejected from libkernelbot.report import MultiProgressReporter, RunProgressReporter, RunResultReport +from libkernelbot.run_eval import FullResult from libkernelbot.submission import ProcessedSubmissionRequest from libkernelbot.utils import setup_logging @@ -48,6 +50,78 @@ async def display_report(self, title: str, report: RunResultReport): pass +def _run_passed(result: FullResult | None, mode: str) -> bool: + if result is None or not result.success: + return False + + eval_result = result.runs.get(mode) + if eval_result is None or eval_result.run is None: + return False + return bool(eval_result.run.success and eval_result.run.passed) + + +def _all_recorded_runs_passed(result: FullResult | None) -> bool: + if result is None or not result.success: + return False + + for eval_result in result.runs.values(): + if eval_result.run is None or not eval_result.run.success or not eval_result.run.passed: + return False + return True + + +def _ranked_completion_status( + mode: SubmissionMode, + gpus: Sequence[str] | None, + results: Sequence[FullResult] | None, +) -> tuple[str, str | None]: + if mode != SubmissionMode.LEADERBOARD: + return "succeeded", None + + result_list = list(results or []) + gpu_names = list(gpus or []) + gpu_count = len(gpu_names) + if gpu_count == 0 and result_list: + gpu_count = len(result_list) // 2 + gpu_names = [f"GPU {i + 1}" for i in range(gpu_count)] + + if gpu_count == 0 or len(result_list) < gpu_count * 2: + return ( + "failed", + "ranked validation did not complete all public and secret runs", + ) + + public_results = result_list[:gpu_count] + secret_results = result_list[gpu_count:gpu_count * 2] + + for gpu, result in zip(gpu_names, public_results, strict=True): + if not _run_passed(result, "leaderboard"): + return ( + "failed", + f"ranked validation failed on {gpu}; submission will not appear on the leaderboard", + ) + + for gpu, result in zip(gpu_names, secret_results, strict=True): + if not _run_passed(result, "leaderboard") or not _all_recorded_runs_passed(result): + return ( + "failed", + f"secret validation failed on {gpu}; submission will not appear on the leaderboard", + ) + + return "succeeded", None + + +def _job_status_update( + status: str, + last_heartbeat: dt.datetime, + error: str | None, +) -> dict: + status_update = {"status": status, "last_heartbeat": last_heartbeat} + if error is not None: + status_update["error"] = error + return status_update + + class BackgroundSubmissionManager: """ This class manages submission in the backeground. It is responsible for @@ -300,18 +374,22 @@ async def heartbeat(): hb_task = asyncio.create_task(heartbeat(), name=f"hb-{sub_id}") reporter = BackgroundSubmissionManagerReporter() - await asyncio.wait_for( + _, results = await asyncio.wait_for( self.backend.submit_full( item.req, item.mode, reporter, sub_id, skip_precheck=False ), timeout=HARD_TIMEOUT_SEC, ) ts = dt.datetime.now(dt.timezone.utc) - logger.info("[Background Job] submission %s succeeded", sub_id) + final_status, error = _ranked_completion_status(item.mode, item.req.gpus, results) + status_update = _job_status_update(final_status, ts, error) + logger.info( + "[Background Job] submission %s finished with status %s", + sub_id, + final_status, + ) with self.backend.db as db: - db.upsert_submission_job_status( - sub_id, status="succeeded", last_heartbeat=ts - ) + db.upsert_submission_job_status(sub_id, **status_update) except asyncio.TimeoutError: ts = dt.datetime.now(dt.timezone.utc) with self.backend.db as db: diff --git a/tests/test_background_submission_manager.py b/tests/test_background_submission_manager.py index b072a17d..3ff2a05b 100644 --- a/tests/test_background_submission_manager.py +++ b/tests/test_background_submission_manager.py @@ -3,10 +3,12 @@ from unittest import mock import pytest +from test_report import create_eval_result, sample_system_info from libkernelbot.background_submission_manager import BackgroundSubmissionManager from libkernelbot.consts import SubmissionMode from libkernelbot.kernelguard import KernelGuardRejected +from libkernelbot.run_eval import FullResult from libkernelbot.submission import ProcessedSubmissionRequest @@ -48,6 +50,17 @@ def get_req(i: int) -> ProcessedSubmissionRequest: ) +def _full_result(*modes: str, failing_mode: str | None = None) -> FullResult: + runs = {} + for mode in modes: + eval_mode = "test" if mode == "test" else "benchmark" + eval_result = create_eval_result(eval_mode) + if mode == failing_mode: + eval_result.run.passed = False + runs[mode] = eval_result + return FullResult(success=True, error="", system=sample_system_info(), runs=runs) + + @pytest.mark.asyncio async def test_enqueue_and_run_job(mock_backend): # mock upsert/update @@ -98,6 +111,45 @@ async def fake_submit_full(req, mode, reporter, sub_id, skip_precheck=False): await manager.stop() +@pytest.mark.asyncio +async def test_leaderboard_secret_failure_marks_job_failed(mock_backend): + db_context = mock_backend.db + db_context.upsert_submission_job_status = mock.Mock( + side_effect=lambda *a, **k: a[0] + ) + db_context.update_heartbeat_if_active = mock.Mock() + + public_result = _full_result("test", "benchmark", "leaderboard") + secret_result = _full_result("test", "benchmark", failing_mode="benchmark") + + async def fake_submit_full(req, mode, reporter, sub_id, skip_precheck=False): + return sub_id, [public_result, secret_result] + + mock_backend.submit_full = fake_submit_full + + manager = BackgroundSubmissionManager( + mock_backend, min_workers=1, max_workers=1, idle_seconds=0.1 + ) + await manager.start() + + req = get_req(1) + req.gpus = ["A100"] + await manager.enqueue(req, SubmissionMode.LEADERBOARD, sub_id=42) + await manager.queue.join() + + assert ( + mock.call( + 42, + status="failed", + last_heartbeat=mock.ANY, + error="secret validation failed on A100; submission will not appear on the leaderboard", + ) + in db_context.upsert_submission_job_status.call_args_list + ) + + await manager.stop() + + @pytest.mark.asyncio async def test_stop_rejects_new_jobs(mock_backend): db_context = mock_backend.db