Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 83 additions & 5 deletions src/libkernelbot/background_submission_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import contextlib
import datetime as dt
import logging
from collections.abc import Sequence
from dataclasses import dataclass

from libkernelbot.backend import KernelBackend
from libkernelbot.consts import SubmissionMode
from libkernelbot.kernelguard import KernelGuardRejected
from libkernelbot.report import MultiProgressReporter, RunProgressReporter, RunResultReport
from libkernelbot.run_eval import FullResult
from libkernelbot.submission import ProcessedSubmissionRequest
from libkernelbot.utils import setup_logging

Expand Down Expand Up @@ -48,6 +50,78 @@ async def display_report(self, title: str, report: RunResultReport):
pass


def _run_passed(result: FullResult | None, mode: str) -> bool:
if result is None or not result.success:
return False

eval_result = result.runs.get(mode)
if eval_result is None or eval_result.run is None:
return False
return bool(eval_result.run.success and eval_result.run.passed)


def _all_recorded_runs_passed(result: FullResult | None) -> bool:
if result is None or not result.success:
return False

for eval_result in result.runs.values():
if eval_result.run is None or not eval_result.run.success or not eval_result.run.passed:
return False
return True


def _ranked_completion_status(
mode: SubmissionMode,
gpus: Sequence[str] | None,
results: Sequence[FullResult] | None,
) -> tuple[str, str | None]:
if mode != SubmissionMode.LEADERBOARD:
return "succeeded", None

result_list = list(results or [])
gpu_names = list(gpus or [])
gpu_count = len(gpu_names)
if gpu_count == 0 and result_list:
gpu_count = len(result_list) // 2
gpu_names = [f"GPU {i + 1}" for i in range(gpu_count)]

if gpu_count == 0 or len(result_list) < gpu_count * 2:
return (
"failed",
"ranked validation did not complete all public and secret runs",
)

public_results = result_list[:gpu_count]
secret_results = result_list[gpu_count:gpu_count * 2]

for gpu, result in zip(gpu_names, public_results, strict=True):
if not _run_passed(result, "leaderboard"):
return (
"failed",
f"ranked validation failed on {gpu}; submission will not appear on the leaderboard",
)

for gpu, result in zip(gpu_names, secret_results, strict=True):
if not _run_passed(result, "leaderboard") or not _all_recorded_runs_passed(result):
return (
"failed",
f"secret validation failed on {gpu}; submission will not appear on the leaderboard",
)

return "succeeded", None


def _job_status_update(
status: str,
last_heartbeat: dt.datetime,
error: str | None,
) -> dict:
status_update = {"status": status, "last_heartbeat": last_heartbeat}
if error is not None:
status_update["error"] = error
return status_update


class BackgroundSubmissionManager:
"""
This class manages submission in the backeground. It is responsible for
Expand Down Expand Up @@ -300,18 +374,22 @@ async def heartbeat():

hb_task = asyncio.create_task(heartbeat(), name=f"hb-{sub_id}")
reporter = BackgroundSubmissionManagerReporter()
await asyncio.wait_for(
_, results = await asyncio.wait_for(
self.backend.submit_full(
item.req, item.mode, reporter, sub_id, skip_precheck=False
),
timeout=HARD_TIMEOUT_SEC,
)
ts = dt.datetime.now(dt.timezone.utc)
logger.info("[Background Job] submission %s succeeded", sub_id)
final_status, error = _ranked_completion_status(item.mode, item.req.gpus, results)
status_update = _job_status_update(final_status, ts, error)
logger.info(
"[Background Job] submission %s finished with status %s",
sub_id,
final_status,
)
with self.backend.db as db:
db.upsert_submission_job_status(
sub_id, status="succeeded", last_heartbeat=ts
)
db.upsert_submission_job_status(sub_id, **status_update)
except asyncio.TimeoutError:
ts = dt.datetime.now(dt.timezone.utc)
with self.backend.db as db:
Expand Down
52 changes: 52 additions & 0 deletions tests/test_background_submission_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from unittest import mock

import pytest
from test_report import create_eval_result, sample_system_info

from libkernelbot.background_submission_manager import BackgroundSubmissionManager
from libkernelbot.consts import SubmissionMode
from libkernelbot.kernelguard import KernelGuardRejected
from libkernelbot.run_eval import FullResult
from libkernelbot.submission import ProcessedSubmissionRequest


Expand Down Expand Up @@ -48,6 +50,17 @@ def get_req(i: int) -> ProcessedSubmissionRequest:
)


def _full_result(*modes: str, failing_mode: str | None = None) -> FullResult:
runs = {}
for mode in modes:
eval_mode = "test" if mode == "test" else "benchmark"
eval_result = create_eval_result(eval_mode)
if mode == failing_mode:
eval_result.run.passed = False
runs[mode] = eval_result
return FullResult(success=True, error="", system=sample_system_info(), runs=runs)


@pytest.mark.asyncio
async def test_enqueue_and_run_job(mock_backend):
# mock upsert/update
Expand Down Expand Up @@ -98,6 +111,45 @@ async def fake_submit_full(req, mode, reporter, sub_id, skip_precheck=False):
await manager.stop()


@pytest.mark.asyncio
async def test_leaderboard_secret_failure_marks_job_failed(mock_backend):
db_context = mock_backend.db
db_context.upsert_submission_job_status = mock.Mock(
side_effect=lambda *a, **k: a[0]
)
db_context.update_heartbeat_if_active = mock.Mock()

public_result = _full_result("test", "benchmark", "leaderboard")
secret_result = _full_result("test", "benchmark", failing_mode="benchmark")

async def fake_submit_full(req, mode, reporter, sub_id, skip_precheck=False):
return sub_id, [public_result, secret_result]

mock_backend.submit_full = fake_submit_full

manager = BackgroundSubmissionManager(
mock_backend, min_workers=1, max_workers=1, idle_seconds=0.1
)
await manager.start()

req = get_req(1)
req.gpus = ["A100"]
await manager.enqueue(req, SubmissionMode.LEADERBOARD, sub_id=42)
await manager.queue.join()

assert (
mock.call(
42,
status="failed",
last_heartbeat=mock.ANY,
error="secret validation failed on A100; submission will not appear on the leaderboard",
)
in db_context.upsert_submission_job_status.call_args_list
)

await manager.stop()


@pytest.mark.asyncio
async def test_stop_rejects_new_jobs(mock_backend):
db_context = mock_backend.db
Expand Down
Loading