Skip to content

feat: implement async scheduling admission control#661

Merged
eric-tramel merged 36 commits into
mainfrom
scheduling-yolo
May 21, 2026
Merged

feat: implement async scheduling admission control#661
eric-tramel merged 36 commits into
mainfrom
scheduling-yolo

Conversation

@eric-tramel
Copy link
Copy Markdown
Contributor

@eric-tramel eric-tramel commented May 14, 2026

📋 Summary

Implements the issue 645 async scheduling epic by splitting runtime control into explicit scheduler task admission and concrete model-request admission, with typed scheduling metadata, AIMD-backed request leases, capacity snapshots, and correlated observability.

This PR now also includes the idle-time optimization pass on top of the original plan PR: adaptive row-group admission, request-pressure-aware task selection, richer scheduler telemetry, Perfetto trace export, and a canonical idle regression/report suite for future scheduler tuning.

🔗 Related Issue

Refs #645

🔄 Changes

✨ Added

  • Generator-facing SchedulingMetadata and validation in the config package.
  • Engine-side task scheduling bridge, fair ready queue views, and task admission leases.
  • ModelRequestExecutor and AdaptiveRequestAdmissionController for per-attempt provider/model/domain admission.
  • Scheduler/request observability events, runtime correlation, capacity snapshots, and deterministic benchmark harnesses.
  • Adaptive row-group admission for widening the runnable frontier when model capacity is idle and queued model demand is low.
  • Request-pressure advisory selection, which can skip a pressured model task when an eligible open peer exists.
  • Scheduler health snapshots, row-group checkpoint telemetry, advisory skip telemetry, and job-level scheduling diagnostics.
  • Canonical idle regression tooling and reports:
    • scripts/benchmarks/run_async_scheduling_idle_regression.py
    • scripts/benchmarks/generate_async_scheduling_idle_report.py
    • scripts/benchmarks/export_async_scheduling_perfetto.py
    • reports/async-scheduling-idle-regression.html
    • reports/async-scheduling-idle-analysis.html
  • Focused unit/regression tests for scheduling metadata, task admission, fair queue behavior, request admission, model request execution, async scheduler behavior, capacity reporting, idle regression guardrails, and benchmark validation.

🔧 Changed

  • Rewired the async scheduler so root and downstream work use the same ready queue, admission, task lease, worker spawn, and release flow.
  • Routed model clients through request admission and exposed request event sinks at the model-call boundary.
  • Updated architecture docs, devnotes, and Fern docs/assets from throttle-oriented language to request-admission terminology.
  • Tightened issue 645 plan contracts around task admission, request admission, migration cleanup, and capacity vocabulary.
  • Extended the benchmark harness to report total idle, starved idle, frontier/dependency-horizon idle, scheduler queue age, downstream ready gap, burstiness, request utilization, request idle, leased request wait, first model dispatch, and advisory skip counts.
  • Added combined adaptive + request-pressure benchmarking so the two scheduler adaptations are measured together rather than only in isolation.

🗑️ Removed

  • Removed legacy scheduling hint resolver code and tests.
  • Removed transport-level throttle manager/client wrapper code and tests in favor of request admission.

📈 Idle Optimization Evidence

Latest quick idle regression: PASS (0 errors, 0 warnings), 22 cases, 367 checks.

  • Adaptive row groups improved the fixed-low frontier case from 10.5% to 67.7% llm utilization and reduced frontier/dependency idle from 81.3% to 14.7%.
  • Request-pressure advisory changed first dispatch from a_pressured to z_open and reduced leased request wait from 49.8ms to 3.2ms in the dedicated pressure case.
  • Combined adaptive + request-pressure improved llm utilization from 82.4% to 84.4%, request utilization from 85.6% to 89.2%, and request idle by 3.7 pp, with 193 advisory skip events. Wall time did not improve in that combined case, so this is currently a flow/utilization improvement rather than a proven throughput improvement.

🔍 Attention Areas

⚠️ Reviewers: This is intentionally a large PR against the epic branch.

🧪 Testing

  • .venv/bin/ruff check packages scripts tests_e2e
  • .venv/bin/ruff format --check packages scripts tests_e2e
  • git diff --check
  • Config package tests: 570 passed
  • Engine package tests: 1,995 passed
  • Interface package tests: 899 passed, 1 skipped
  • Benchmark schema and derived metric checks passed
  • Stale throttle/scheduling term cleanup gates passed, excluding expected migration/removal references
  • Live GPT-5.5 benchmark lanes: max_parallel, AIMD, short/long, fan-in/fan-out, bottleneck workloads
  • Live GPT-5 Nano 1024+ benchmark lanes: cap scale, AIMD scale, fan scale, mixed GPT-5.5 -> Nano pipeline
  • uv run ruff check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py
  • uv run ruff format --check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py
  • uv run pytest packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py -q — 17 passed
  • uv run python scripts/benchmarks/run_async_scheduling_idle_regression.py --quick ... — PASS, 0 errors, 0 warnings
  • make test as a single aggregate command was not rerun; equivalent package suites above passed earlier, and focused checks passed for this optimization pass
  • E2E tests added/updated: N/A for this PR; live provider benchmark artifacts were collected instead

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated
  • Idle regression report and guardrails added

Notes

Raw live benchmark traces and large artifact trees remain local because the full artifact tree is large. This PR includes condensed reports and reusable benchmark tooling so reviewers can inspect the evidence and rerun the canonical regression suite without committing raw JSONL/timeline dumps.


Description updated with AI

eric-tramel and others added 10 commits May 14, 2026 10:46
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Resolve the second review pass over plans/645 by making the Markdown spec and UML source agree on task admission, request admission, capacity, telemetry, benchmark, migration, and issue-map contracts.

Key updates include canonical event names, richer AsyncCapacityPlan fields, request waiter and cancellation semantics, timed wakeups, retry/salvage lease ordering, and clearer public/internal documentation boundaries.

Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 14, 2026

MkDocs preview: https://81ff1f64.dd-docs-preview.pages.dev

Fern preview: https://nvidia-preview-pr-661.docs.buildwithfern.com/nemo/datadesigner

Fern previews include the docs-website version archive with PR changes synced into latest. Notebook tutorials are rendered without execution outputs in previews.

Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
@eric-tramel eric-tramel marked this pull request as ready for review May 18, 2026 16:39
@eric-tramel eric-tramel requested a review from a team as a code owner May 18, 2026 16:39
@nabinchha
Copy link
Copy Markdown
Contributor

Thanks for putting this together, @eric-tramel — this is a substantial reshaping of the runtime control surfaces and the new module ownership reads cleanly. Here are my thoughts.

Summary

This PR splits runtime control into explicit scheduler task admission (FairTaskQueue + TaskAdmissionController) and provider/model request admission (AdaptiveRequestAdmissionController + ModelRequestExecutor), retires the old ThrottleManager/ThrottledModelClient stack, adds an observability/capacity surface, and updates architecture docs and the published Fern site to match. The implementation matches the PR description and the contracts laid out under plans/645/.

Findings

Warnings — Worth addressing

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py:347_stable_task_id duplicates stable_task_id and uses a function-local import

  • What: completion.py defines _stable_task_id(task) with the same hashing recipe as resources.stable_task_id(task), plus an in-function import hashlib.
  • Why: This is both a DRY violation and a STYLEGUIDE.md violation ("Place imports at module level, not inside functions"). It also creates real drift risk: _stable_task_id produces the IDs we compare against in mark_enqueued, while stable_task_id is the one the resolver/scheduler actually attaches to SchedulableTask. If either drifts (e.g. someone changes the hash algorithm), mark_enqueued will silently stop matching and frontier dedup breaks.
  • Suggestion: Drop the local function and import the public one from resources:
from data_designer.engine.dataset_builders.scheduling.resources import stable_task_id
# ...
self._frontier = {task for task in self._frontier if stable_task_id(task) not in wanted}

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py:93,108blocked accumulator is collected but never read

  • What: FairTaskQueue.select_next builds a blocked: list[tuple[float, int, TaskGroupKey]] and appends ineligible heap entries to it, but the list is never consumed before the function returns.
  • Why: Dead code. It's not flagged by F841 (the list is mutated, just never read), so it'll quietly stick around. It also reads as if it was meant to feed diagnostics or a stable backoff that didn't make it into the PR.
  • Suggestion: Either remove the local entirely if it's leftover from refactoring, or wire it into the _DispatchOutcome/explain_blocked path you already have so callers see which groups got skipped during selection.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:639-640, model_request_executor.py:245-246, dataset_builders/async_scheduler.py:369-370 — Event sink exceptions are silently swallowed

  • What: Three independent paths emit observability events through a try / except Exception: return (or continue).
  • Why: We lose any signal that the sink itself is misbehaving (e.g. a buggy custom RequestAdmissionEventSink that always raises). Given the new benchmark/QA story leans heavily on these events, a stuck sink would be effectively invisible.
  • Suggestion: Log at logging.WARNING once per kind (or with rate limiting), and document the contract on Sink.emit_* ("must not raise; failures are dropped"). At a minimum, attach exc_info=True so users can find the broken sink in their logs.
except Exception:
    logger.warning("Admission event sink raised; dropping event", exc_info=True)
    return

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:229-281acquire_async polls every ~50 ms instead of being woken

  • What: The async path loops with self._lock: ... ; wait = self._wait_seconds_locked(...) then await asyncio.sleep(wait) where _wait_seconds_locked clamps wait to a minimum candidate of 0.05. The threading _condition.notify_all() paths only wake acquire_sync waiters; the async coroutine is on asyncio.sleep and isn't notified by them.
  • Why: Under contention, every async lease release adds up to ~50 ms of additional latency before the next waiter sees its assigned lease. The benchmark harness will mask this on hot endpoints, but it's a measurable ceiling on scheduler-driven async traffic, and "wakeup" is in the contracts language but not implemented for asyncio.
  • Suggestion: Add a per-resource asyncio.Event (or a small list of asyncio.Futures) that _admit_waiters_locked and release set when an async waiter is admitted; switch acquire_async to asyncio.wait_for(event.wait(), timeout=wait) so we still bound the wait by deadline but stop polling. Alternatively, document the polling cadence as intentional in plans/645/request-admission.md and pull 0.05 out as a named module constant (e.g. _ASYNC_WAIT_POLL_INTERVAL_S) so it's easy to find and tune.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py:196-227acquire_sync will block the event loop if called from async code

  • What: acquire_sync calls self._condition.wait(timeout=wait), which holds a real OS-level wait inside a threading.Condition.
  • Why: There's no enforcement that this method is never reached from an asyncio task. _execute_sync in ModelRequestExecutor is the intended caller, but a future caller (e.g. a custom column generator that re-enters the controller) could deadlock the loop. The async/sync executor split assumes everyone reads the contract.
  • Suggestion: Either rename to acquire_blocking and add a docstring warning, or assert asyncio.get_running_loop() raises (i.e. we're not on a loop) at the top of the method:
def acquire_sync(self, item: RequestAdmissionItem) -> RequestAdmissionLease:
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        pass
    else:
        raise RuntimeError(
            "acquire_sync would block the running event loop; use acquire_async instead."
        )
    ...

packages/data-designer-config/src/data_designer/config/run_config.pyRunConfig.throttle removed with no deprecation path

  • What: ThrottleConfig and RunConfig.throttle are deleted outright. ConfigBase enables extra="forbid", so any user code that still does RunConfig(throttle=...) will fail Pydantic validation with extra inputs are not permitted.
  • Why: This is a real public-API breaking change. The PR description correctly calls it out, but a sudden Pydantic error on first run will be confusing — users will see a validator stack trace, not "throttle has been replaced by request admission".
  • Suggestion: Add a model_validator(mode="before") that detects throttle in the input dict and raises a typed ConfigValidationError (or your equivalent canonical error) with a one-line "Replaced by adaptive request admission; see X." This keeps extra="forbid" honest while giving users a humane error. If we want to be even friendlier, accept and warn for one release. Either way, this should be in the release notes when the epic lands on main.

packages/data-designer-engine/src/data_designer/engine/column_generators/generators/base.py:166-208 — Endpoint-bucket typing forces runtime isinstance casts

  • What: endpoints: dict[tuple[str, str, str], dict[str, object]] is built up with bucket["aliases"] and bucket["caps"], and the code defends each access with if isinstance(cast_aliases, list): cast_aliases.append(...).
  • Why: The runtime checks are there only to satisfy the loose dict[str, object] typing — they can't actually fail because the only writer is this same function. The result is harder to read than the underlying logic.
  • Suggestion: Lift a small private dataclass:
@dataclass
class _EndpointBucket:
    aliases: list[str] = field(default_factory=list)
    caps: list[int] = field(default_factory=list)

endpoints: dict[tuple[str, str, str], _EndpointBucket] = {}
...
bucket = endpoints.setdefault(endpoint, _EndpointBucket())
bucket.aliases.append(alias)
bucket.caps.append(cap)

This drops the isinstance ladder and tightens the typing without expanding the public surface.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py:1042-1085 — Generic except Exception in task body classifies non-LLM bugs as drop-row events

  • What: When a task raises something that isn't in RETRYABLE_MODEL_ERRORS, the scheduler treats it as a non-retryable drop, logs a warning, and continues. So a KeyError/TypeError/AttributeError from our own scheduler/generator code looks identical to "model returned a JSON that fails validation".
  • Why: That makes legitimate engine bugs invisible behind row drops. The early-shutdown gate will eventually fire if this is widespread, but a one-off bug on a hot column path can quietly drain a large portion of a run.
  • Suggestion: Either bump the existing log to logger.error(..., exc_info=True) for non-retryable, non-ProviderError/Generation*Error types, or add an explicit allowlist of "expected provider/generator errors" and re-raise anything else into a typed DatasetGenerationError that the run knows is internal. Tests already exercise the success/failure mix; a unit test that asserts KeyError from a generator raises rather than silently drops the row would lock this in.

packages/data-designer-config/src/data_designer/config/run_config.py:55-57 — Note glued into the Attributes: section

  • What: The "Request admission is engine-internal in V1 and is not exposed as a public run-config knob." sentence sits inside the Google-style Attributes: block where every other entry documents an attribute.
  • Why: Tools that parse the docstring (and reviewers skimming) will read this as a malformed attribute entry. It also looks like a dangling note from a previous iteration.
  • Suggestion: Move it into a separate Notes: section below Attributes:, or up into the class-level summary paragraph so it's clearly a meta-note, not a field.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py:176-181 — Bounded-borrow release silently discards "credit" for non-borrowing tasks

  • What: BoundedBorrowTaskAdmissionPolicy.on_release always returns negative debt_changes for every leased resource, regardless of whether the lease originally borrowed. _apply_delta clamps to zero, so no debt goes negative — but a task that never borrowed still "repays" debt incurred by someone else.
  • Why: This is probably intentional cross-lease debt repayment (and repay_on_withheld_peer_pressure=True hints at that), but it's not obvious from the code or the test names. A future contributor reading this without architecture/ context could easily change it.
  • Suggestion: Add a one-line comment on on_release explaining "any release in the group repays group-level debt up to zero", and add a test that pins the behaviour: borrow once, release a non-borrowing lease, observe debt drops by the full lease size.

Suggestions — Take it or leave it

packages/data-designer-engine/src/data_designer/engine/observability.py:35-39RuntimeCorrelationProvider.set shadows the builtin

  • What: The method name set shadows Python's set() builtin within method bodies and reads oddly at call sites (runtime_correlation_provider.set(...)).
  • Suggestion: push or set_correlation would be more grep-able. Not a blocker.

packages/data-designer-engine/src/data_designer/engine/models/request_admission/resolver.py:20-40RequestResourceResolver is a stateless one-method class

  • What: The class has no instance state and a single resolve(...) method.
  • Suggestion: A free function resolve_request_resource(...) would convey "no state" more directly. If the plan is to swap implementations later, a Protocol with one method captures that without the class indirection. YAGNI either way.

packages/data-designer-engine/src/data_designer/engine/observability.py:100-101,128-132snapshot/request_resource_key typed as object | None

  • What: SchedulerAdmissionEvent.snapshot, RequestAdmissionEvent.request_resource_key, request_group_key, and pressure_snapshot are all object | None (presumably to avoid circular imports at runtime).
  • Suggestion: With TYPE_CHECKING-only imports you can keep the precise types in static checkers without taking a runtime cost. Sinks consuming these events would then get autocomplete instead of poking at object.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/resources.py:57-61stable_task_id uses SHA-1 where a non-cryptographic hash would do

  • What: We hash a small key string with SHA-1 and truncate to 16 hex chars per dispatch.
  • Suggestion: For task identity only, hashlib.blake2b(raw, digest_size=8).hexdigest() is a touch faster and signals "not cryptographic" by name. Not measurable on its own; mostly a readability nudge.

Duplication between FairTaskQueue and RequestFairQueue (scheduling/queue.py and models/request_admission/queue.py)

  • What: Two virtual-time fair queues with very similar shape but slightly different commit contracts and item types.
  • Suggestion: Wait until a third one shows up before extracting a base class — STYLEGUIDE.md says rule-of-three for DRY. Just flagging that the next reviewer touching either will likely want to skim both.

packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py:159-160 — Empty mark_complete body

  • What: The compatibility hook is just a docstring with no body, which works because docstrings count as a no-op statement, but reads as "did the author forget to implement this?".
  • Suggestion: Add an explicit pass or ... so it parses as intentionally empty at a glance.

packages/data-designer-config/src/data_designer/config/scheduling.py:103-118SchedulingMetadataError is defined after SchedulingMetadata

  • What: __post_init__ references SchedulingMetadataError which is defined a few classes below. It works (Python resolves at call time) but it's an unusual layout in this codebase.
  • Suggestion: Move the error class above the dataclass that raises it — mirrors the rest of data_designer.config.errors and makes the file read top-down.

What Looks Good

  • Lease lifecycle hygiene. Both controllers track controller_generation, distinguish duplicate / unknown_lease / stale_lease, and have tests that pin the duplicate-release-doesn't-double-credit invariant. That's exactly the kind of thing that bit the old throttle manager.
  • Two-phase fair queue (select_nextcommit). Non-mutating selection plus a sequence_version check makes "select, fail to admit, retry next eligible group" possible without losing fairness. Clean design.
  • Cancellation correctness in acquire_async. The race where CancelledError arrives after a lease was assigned but before the loop noticed is explicitly handled and tested (test_async_cancellation_after_waiter_assignment_releases_lease). Easy to get wrong; it's right here.
  • Doc + plan alignment. architecture/dataset-builders.md, architecture/models.md, the user-facing docs/concepts/architecture-and-performance.md, and the Fern mirrors all move in lockstep with the code, and plans/645/ documents the contracts at the level of detail this kind of refactor needs.
  • Linter clean. ruff check and ruff format --check pass on every file I touched.

Verdict

Needs changes — the duplicated _stable_task_id (with the in-function import), the dead blocked accumulator, the swallowed observability-sink exceptions, the polling/blocking semantics on acquire_async/acquire_sync, and the unguarded RunConfig.throttle removal are all worth addressing before this lands on main. None are critical blockers individually, but collectively they're the kind of subtle issues that become hard to track down once the epic ships.


This review was generated by an AI assistant.

- add adaptive row-group admission and request-pressure advisory telemetry
- add idle regression suite, HTML reports, and Perfetto export tooling
- add combined adaptive/request-pressure benchmark guardrails
- expand scheduler, request admission, and benchmark tests
@eric-tramel
Copy link
Copy Markdown
Contributor Author

Idle-time optimization and observability pass

This pass adds a layer on top of the original async scheduling plan PR. The original PR established task admission, request admission, scheduling metadata, correlated observability, and capacity snapshots. This follow-up focused on making scheduler idle time visible, measurable, and tunable.

What changed

  • Added adaptive row-group admission in the async scheduler. The scheduler can now start from a narrow admitted frontier and increase the row-group target when llm_wait capacity is idle and queued model demand is low, bounded by the configured hard cap and max-admitted-row guardrails.
  • Added request-pressure advisory task selection. When a candidate task targets a pressured request resource and an eligible peer can run against an open request resource, the scheduler can skip the pressured task for that selection pass and emit request_pressure_advisory_skipped telemetry.
  • Added richer scheduler telemetry:
    • scheduler_job_started
    • scheduler_job_completed
    • scheduler_health_snapshot
    • row_group_checkpointed
    • request_pressure_advisory_skipped
  • Added job-shape and live-health diagnostics to scheduling telemetry: active workers, active/admitted rows, active row groups, target row groups, queued demand by resource, leased/available resources, request pressure, row-group admission block reasons, and advisory skip counts.
  • Added Perfetto export tooling for benchmark timelines so scheduler events, row groups, task leases, request waits, request leases, and counters can be inspected as traces.
  • Added canonical idle regression tooling and reports:
    • scripts/benchmarks/run_async_scheduling_idle_regression.py
    • scripts/benchmarks/generate_async_scheduling_idle_report.py
    • scripts/benchmarks/export_async_scheduling_perfetto.py
    • reports/async-scheduling-idle-analysis.html
    • reports/async-scheduling-idle-regression.html
  • Extended the benchmark harness to report total idle, starved idle, frontier/dependency-horizon idle, scheduler queue age, downstream ready gap, burstiness, request utilization, request idle, request starved idle, leased request wait, first model dispatch, and advisory skip counts.
  • Added a combined adaptive-request-pressure scenario so adaptive row-group admission and request-pressure advisory are tested together instead of only in separate synthetic cases.

What improved

Latest quick idle regression result: PASS, 0 errors, 0 warnings, 22 cases, 367 checks.

  • Adaptive row-group admission materially improved the under-admitted frontier case:
    • fixed-low control: 10.5% llm utilization, 81.3% frontier/dependency idle, 4.48s wall
    • adaptive enabled: 67.7% llm utilization, 14.7% frontier/dependency idle, 0.92s wall
  • Request-pressure advisory improved the dedicated pressure scenario:
    • first model dispatch moved from a_pressured to z_open
    • leased request wait moved from 49.8ms to 3.2ms
    • advisory emitted pressure-skip telemetry (29 skips in the quick run)
  • Combined adaptive + request-pressure improved flow metrics:
    • llm utilization moved from 82.4% to 84.4%
    • request utilization moved from 85.6% to 89.2%
    • request idle improved by 3.7 pp
    • advisory emitted 193 skip events
    • first dispatch moved from a_pressured to z_open

One important caveat: the combined case did not improve wall time in the latest quick run (0.354s control vs 0.371s combined), and leased request wait moved slightly worse (488ms to 515ms). So the combined policy is currently a confirmed utilization/flow improvement, not yet a proven throughput improvement. The new regression/report suite makes that tradeoff explicit so future scheduler tuning can optimize against a stable bar.

Verification run in this pass

  • uv run ruff check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py
  • uv run ruff format --check scripts/benchmarks/benchmark_async_scheduling.py scripts/benchmarks/generate_async_scheduling_idle_report.py scripts/benchmarks/run_async_scheduling_idle_regression.py packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py
  • uv run pytest packages/data-designer-engine/tests/engine/test_async_scheduling_benchmark.py -q (17 passed)
  • uv run python scripts/benchmarks/run_async_scheduling_idle_regression.py --quick ... (PASS, 0 errors, 0 warnings)

Raw timeline/perfetto artifacts remain local; the PR includes the reusable tooling and HTML reports rather than committing the large artifact trees.

Comment thread fern/assets/owning-the-model-stack/retry-boundary.png
Comment thread artifacts/645-live-bench-nano/agent-aimd-scale/README.md Outdated
@andreatgretel
Copy link
Copy Markdown
Contributor

Did one more shallow final pass on this, plus an independent Claude cross-review at head 70974fd. No approval-blocking findings from either pass. The follow-up fixes already merged into this branch (#679, #680, #681, #683, #684, #685) look present and sane.

A few non-blocking nits / follow-ups worth considering:

  1. Please fold/merge fix: tighten request controller release semantics #682 before final approval. It fixes the remaining request-controller release semantics I was still worried about: exact release telemetry outcome and AIMD rate-limit ceiling behavior.

  2. BoundedBorrowTaskAdmissionPolicy currently treats borrow debt as group-level debt that any completed lease in the group can repay. That may be intentional, but it is a softer contract than per-borrow/per-lease debt. If we keep it, I'd document that explicitly on the policy config so future tuning does not assume strict per-borrow accounting.

  3. Scheduler event diagnostics may contain Python objects such as TaskGroupKey. In-memory sinks are fine, but JSON-style external sinks could fail and drop events. Either normalizing diagnostics at the emit boundary or documenting the sink contract would make this easier to integrate later.

  4. Adaptive row-group admission appears additive-only: it can raise the target but does not actively shrink it under later sustained pressure. That seems okay as an optimization, but docs/comments should avoid implying full AIMD-style behavior at the row-group layer unless a decrease path is added.

  5. Minor defensive edge cases Claude noticed:

    • _dispatch_selected_task rollback on worker-spawn failure could also undo _dispatched and in_flight_count.
    • The pending_pre_batch continue path could yield if no progress is possible, to avoid a pathological tight loop.
    • It would be useful to add an end-to-end timeout classification test for native adapters raising ProviderError(kind=TIMEOUT, status_code=None).
    • Direct ThrottleConfig imports now fail with a generic missing-symbol error; a friendly migration error would be nicer.

Overall I'm comfortable with these as non-blocking, assuming #682 lands before we approve/merge this epic PR. The remaining items look like doc hardening, sink-contract polish, or targeted follow-up tests rather than reasons to hold the whole async scheduling PR.

Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
Signed-off-by: Eric W. Tramel <eric.tramel@gmail.com>
…scheduling-yolo

# Conflicts:
#	plans/645/module-ownership.md
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 20, 2026

Greptile Summary

This PR implements async scheduling admission control (issue #645), splitting runtime control into explicit task-admission and request-admission layers. It introduces AIMD-backed request leases, a fair task queue with virtual-time scheduling, adaptive row-group admission, and request-pressure advisory selection.

  • New request-admission layer (controller.py, model_request_executor.py): AdaptiveRequestAdmissionController provides AIMD congestion control with per-domain lease accounting; ModelRequestExecutor wraps model clients at the API call boundary, releasing leases with outcome classification (rate_limited, success, etc.) that drives AIMD increases/decreases.
  • Task scheduler rewire (async_scheduler.py): root and downstream tasks now share a single FairTaskQueue + TaskAdmissionController path; adaptive row-group admission widens the runnable frontier when LLM capacity is idle; request-pressure advisory skips pressured tasks when an open peer exists.
  • Removed legacy throttle layer (throttle_manager.py, throttled.py deleted): replaced entirely by the new request-admission infrastructure with equivalent retry and observability coverage.

Confidence Score: 5/5

  • Safe to merge. The task-admission, request-admission, and fair-queue layers are self-consistent, and the exception classification and AIMD feedback chain have been verified end-to-end.
  • All three issues raised in prior review threads have been addressed (condition-ordering fix in _adaptive_row_group_block_reason is present in the current code, _run_generator_call wraps bare user exceptions before they reach the internal-bug classifier, _require_dataframe_result guards batch/from-scratch returns before post-processing). The AIMD decrease guard (admitted_adaptive_limit <= prev_limit), the dropped-row downstream-task suppression in _enqueue_downstream, the rate-limit → ModelRateLimitError conversion through ModelFacade, and the async CancelledError cleanup in acquire_async all check out. No new logic defects found.
  • No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Central scheduler rewired to use FairTaskQueue + TaskAdmissionController for all work; adds adaptive row-group admission, request-pressure advisory skip, generator call wrapper (_run_generator_call), and dataframe result validation (_require_dataframe_result). Logic for dropped-row handling, AIMD feedback, and advisory skip all verified correct.
packages/data-designer-engine/src/data_designer/engine/models/request_admission/controller.py New AIMD request-admission controller with threading.Condition-based sync path, asyncio.Event-based async path, CancelledError cleanup, and fair waiter queue. Lease lifecycle (acquire/release), startup ramp, and AIMD increase/decrease logic are correct; the commit-returns-None branch in _admit_waiters_locked is unreachable dead code under the lock.
packages/data-designer-engine/src/data_designer/engine/models/clients/model_request_executor.py New model-call boundary wrapping the adapter with request-admission acquire/release; correctly classifies CancelledError, ProviderError, TimeoutError, and unexpected exceptions in both sync and async paths; rate-limit errors propagate to ModelFacade which converts them to ModelRateLimitError for scheduler retry.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py Completion tracker updated with FrontierDelta and graph-backed frontier; _enqueue_downstream correctly guards against adding downstream tasks for dropped rows (row_index not in rg_dropped check), and _reevaluate_batch_tasks handles batch unblocking on row drop.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/queue.py New FairTaskQueue with virtual-time fair scheduling, lazy discard via _purge_queue_head, and sequence-version-based stale-commit detection. select_next correctly avoids mutating queue state; commit's sequence_version check is sound.
packages/data-designer-engine/src/data_designer/engine/models/request_admission/queue.py New RequestFairQueue with virtual-time fair ordering for request waiters; remove() increments _sequence_version ensuring commit() consistently rejects stale selections; _first_valid_waiter handles lazy cleanup of removed waiters.
packages/data-designer-engine/src/data_designer/engine/models/clients/factory.py Updated to wrap the adapter with ModelRequestExecutor when request_admission is provided; transport-level retries are disabled (_NO_TRANSPORT_RETRY_CONFIG) when ModelRequestExecutor handles application-level retries, correctly preventing double-retry on transient errors.

Sequence Diagram

sequenceDiagram
    participant S as AsyncScheduler
    participant FAQ as FairTaskQueue
    participant TAC as TaskAdmissionController
    participant MF as ModelFacade
    participant MRE as ModelRequestExecutor
    participant RAC as AdaptiveRequestAdmissionController
    participant API as Provider API

    S->>FAQ: enqueue(schedulable_tasks)
    loop dispatch loop
        S->>FAQ: select_next(is_dispatch_eligible)
        FAQ-->>S: QueueSelection
        S->>TAC: try_acquire(item, view)
        TAC-->>S: TaskAdmissionLease
        S->>FAQ: commit(selection)
        S->>S: spawn_worker(_execute_task)
    end

    Note over S,API: Worker execution
    S->>MF: acompletion(request)
    MF->>MRE: acompletion(request)
    MRE->>RAC: acquire_async(item)
    RAC-->>MRE: RequestAdmissionLease
    MRE->>API: HTTP POST /completions
    alt success
        API-->>MRE: 200 OK
        MRE->>RAC: release(lease, success)
        RAC->>RAC: AIMD increase (streak++)
    else rate limited
        API-->>MRE: 429 Too Many Requests
        MRE->>RAC: release(lease, rate_limited)
        RAC->>RAC: AIMD decrease + cooldown
        MRE-->>MF: ProviderError(RATE_LIMIT)
        MF-->>S: ModelRateLimitError (retryable)
    end
    S->>TAC: release(task_lease)
    S->>S: wake_event.set()
Loading

Reviews (8): Last reviewed commit: "Merge origin/main into scheduling-yolo" | Re-trigger Greptile

@andreatgretel
Copy link
Copy Markdown
Contributor

Final pass after #682 merged: no runtime blockers from my local review or Claude cross-review. The request-controller release semantics now look good, and focused checks passed locally.

Two non-blocking cleanup/docs items worth addressing before final merge or explicitly tracking as follow-ups:

  1. docs/concepts/architecture-and-performance.md still says AIMD increases until a "stabilized rate-limit threshold." Current code treats rate_limit_ceiling as telemetry/event context, not as an enforced soft cap, so the docs should describe recovery back toward max_parallel_requests and possible sawtooth behavior.

  2. Since the benchmark scripts/reports were removed from this PR, could we preserve them somewhere durable and link them from Build async scheduling benchmark harness for admission and fairness work #649? A companion branch, draft PR, or artifact bundle would be enough. It would be useful to keep the harness scripts, exact commands, PR head SHA, and representative output together so future Build async scheduling benchmark harness for admission and fairness work #649 work can start from the evidence already collected rather than reconstructing it from scratch.

Validation on current head 538891cd: GitHub/local file lists match, GitHub checks green, focused ruff, git diff --check, and 220 focused tests passed.

andreatgretel
andreatgretel previously approved these changes May 20, 2026
@nabinchha
Copy link
Copy Markdown
Contributor

One UX concern: this PR removes the public ThrottleConfig / RunConfig.throttle tuning surface, but I don’t see an equivalent replacement exposed for users.

I agree with moving runtime request control behind ModelRequestExecutor / request admission, but from a user perspective this is a regression: advanced users could previously tune cooldown, multiplicative decrease, additive increase, success window, etc. In the new path, RequestAdmissionConfig still exists with those knobs internally, but the normal factory path constructs AdaptiveRequestAdmissionController() with defaults and RunConfig rejects throttle.

Could we either:

  • expose a replacement advanced tuning surface, even if marked experimental/internal-ish;
  • keep a compatibility/deprecation path for one release; or
  • explicitly document that fine-grained AIMD tuning is no longer available and only max_parallel_requests remains user-adjustable?

Right now the behavior change is easy to miss because the adaptive logic still has tunables internally, but users no longer have a way to adjust them.

@nabinchha
Copy link
Copy Markdown
Contributor

I think two compatibility points are important enough to handle in this PR, especially because we just made plugin APIs stable. More broadly, I’d prefer that this backend refactor not introduce public API/UX breakage unless we explicitly decide that in the release plan.

  1. ThrottleConfig / RunConfig.throttle should probably remain as a compatibility surface for one release, translated into the new request-admission backend. The new backend can still be RequestAdmissionConfig internally, but the public UX should not break existing configs that tune reduce_factor, additive_increase, success_window, and cooldown_seconds. If we keep ceiling_overshoot, the request-admission controller should either preserve equivalent soft-ceiling behavior or reject/document that one field specifically.

  2. Multi-model plugin scheduling should preserve the documented get_model_aliases() UX. Current plugin docs tell authors to override get_model_aliases() for writer/judge or other multi-model plugins, and this PR’s default ColumnGeneratorWithModelRegistry.get_scheduling_metadata() raises ambiguous_model_aliases when aliases resolve to different endpoints. That means a plugin following the stable docs can start failing unless it learns a new, undocumented scheduling hook. Could we fallback to SchedulingMetadata.custom_model(...) for multi-endpoint aliases, or otherwise keep the old multi-alias behavior by default?

Both seem compatible with the new backend architecture: keep request admission and scheduling metadata internally, but preserve the public config/plugin UX with deprecation/migration guidance rather than hard-breaking it here.

@eric-tramel
Copy link
Copy Markdown
Contributor Author

Responding to @andreatgretel's final-pass notes from May 19. I pushed follow-up commits c97002a9 and da6fa56a on scheduling-yolo to close these out.

  1. Please fold/merge fix: tighten request controller release semantics #682 before final approval. It fixes the remaining request-controller release semantics I was still worried about: exact release telemetry outcome and AIMD rate-limit ceiling behavior.

Addressed before this pass: #682 is already folded into the branch. I also rechecked that this follow-up did not touch or regress the request-controller release / ceiling behavior.

  1. BoundedBorrowTaskAdmissionPolicy currently treats borrow debt as group-level debt that any completed lease in the group can repay. That may be intentional, but it is a softer contract than per-borrow/per-lease debt. If we keep it, I'd document that explicitly on the policy config so future tuning does not assume strict per-borrow accounting.

Addressed in c97002a9: documented this directly on BoundedBorrowTaskAdmissionPolicyConfig and in plans/645/task-admission.md. The contract now says borrow debt is tracked by task group and scheduler resource, and any completed lease in the same group repays released-resource debt rather than tying debt to the originally borrowed lease.

  1. Scheduler event diagnostics may contain Python objects such as TaskGroupKey. In-memory sinks are fine, but JSON-style external sinks could fail and drop events. Either normalizing diagnostics at the emit boundary or documenting the sink contract would make this easier to integrate later.

Addressed in c97002a9 and tightened in da6fa56a: SchedulerAdmissionEvent and RequestAdmissionEvent now normalize captured correlation, keys, snapshots, and diagnostics to JSON-compatible values at DTO construction. Tests cover json.dumps(asdict(event)) for dataclasses, enums, sets, tuples, snapshots, and diagnostics. plans/645/observability.md and plans/645/contracts.md now describe the JSON-compatible event shape.

  1. Adaptive row-group admission appears additive-only: it can raise the target but does not actively shrink it under later sustained pressure. That seems okay as an optimization, but docs/comments should avoid implying full AIMD-style behavior at the row-group layer unless a decrease path is added.

Addressed in c97002a9: plans/645/capacity-model.md and architecture/dataset-builders.md now state that row-group admission is fixed by default through the normal builder path, and the internal adaptive mode is additive-only ramp-up to the semaphore hard cap, not row-group AIMD shrink/recovery. The plans also call out the current request-pressure advisory as a narrow read-only path and leave broader provider/resource-aware scheduling to #651; da6fa56a aligned plans/645/architecture.md with that.

  1. Minor defensive edge cases Claude noticed:
    • _dispatch_selected_task rollback on worker-spawn failure could also undo _dispatched and in_flight_count.
    • The pending_pre_batch continue path could yield if no progress is possible, to avoid a pathological tight loop.
    • It would be useful to add an end-to-end timeout classification test for native adapters raising ProviderError(kind=TIMEOUT, status_code=None).
    • Direct ThrottleConfig imports now fail with a generic missing-symbol error; a friendly migration error would be nicer.

Addressed across c97002a9 and da6fa56a:

  • _dispatch_selected_task now rolls back _dispatched, _in_flight, row-group in_flight_count, and the task lease if worker spawning fails; regression coverage was added.
  • The pending_pre_batch no-progress path now yields with await asyncio.sleep(0); regression coverage was added.
  • Added sync and async facade-level tests proving ProviderError(kind=TIMEOUT, status_code=None) maps to ModelTimeoutError.
  • Added friendly ThrottleConfig migration errors for data_designer.config.ThrottleConfig, from data_designer.config import ThrottleConfig, from data_designer.config.run_config import ThrottleConfig, and RunConfig(throttle=...), pointing users to RunConfig.request_admission / RequestAdmissionTuningConfig.

Validation for the latest follow-up:

  • uv run ruff check packages/data-designer-config/src/data_designer/config/run_config.py packages/data-designer-config/tests/config/test_run_config.py plans/645/architecture.md plans/645/contracts.md
  • uv run ruff format --check packages/data-designer-config/src/data_designer/config/run_config.py packages/data-designer-config/tests/config/test_run_config.py
  • uv run pytest packages/data-designer-config/tests/config/test_run_config.py -q (10 passed)

@eric-tramel
Copy link
Copy Markdown
Contributor Author

Follow-up for @nabinchha's compatibility concern:

ThrottleConfig / RunConfig.throttle should probably remain as a compatibility surface for one release, translated into the new request-admission backend.

Added in aaa7eadd: ThrottleConfig is back as a deprecated public compatibility DTO, and RunConfig(throttle=...) now translates into RunConfig.request_admission=RequestAdmissionTuningConfig(...) instead of failing.

The shim maps the old throttle fields into the new request-admission DTO:

  • reduce_factor -> multiplicative_decrease_factor
  • additive_increase -> additive_increase_step
  • success_window -> increase_after_successes
  • cooldown_seconds -> cooldown_seconds

It emits a DeprecationWarning telling users to move to RunConfig.request_admission / RequestAdmissionTuningConfig. ceiling_overshoot is still accepted on ThrottleConfig for DTO compatibility, but it is not forwarded because the new request-admission controller does not expose an equivalent overshoot knob.

Also added regression coverage for:

  • ThrottleConfig export from data_designer.config
  • RunConfig(throttle=ThrottleConfig(...)) translation
  • dict-form RunConfig(throttle={...}) translation
  • rejection when both throttle and request_admission are supplied

Validation for the shim commit:

  • uv run ruff check .
  • uv run ruff format --check .
  • git diff --check
  • uv run pytest packages/data-designer-config/tests -q (581 passed)
  • uv run pytest packages/data-designer-config/tests/config/test_run_config.py packages/data-designer-engine/tests/engine/models/test_model_registry.py -q (45 passed)

@eric-tramel
Copy link
Copy Markdown
Contributor Author

Follow-up for Nabin's get_model_aliases() feedback:

Multi-model plugin scheduling should preserve the documented get_model_aliases() UX.

Updated in a06036fa so the interfaces are separated cleanly:

  • get_model_aliases() remains the existing startup health-check hook. We do not require every alias returned there to collapse to one scheduler endpoint.
  • ColumnGenerator.get_scheduling_metadata() / SchedulingMetadata is the scheduler-specific plugin hook.
  • The default ColumnGeneratorWithModelRegistry.get_scheduling_metadata() still emits precise model metadata when aliases resolve to one concrete provider/model/generation endpoint, including same-endpoint alias dedupe/min-cap behavior.
  • If a registry-backed plugin reports multiple aliases that resolve to different endpoints, the default now falls back to deterministic custom_model scheduling metadata instead of raising ambiguous_model_aliases.
  • Added regression coverage for a plugin config where get_model_aliases() returns ['draft', 'judge'] and those aliases resolve to different endpoints; scheduler construction no longer fails.

I also updated plans/645 to make this boundary explicit. Verification after the change: full engine tests (2053 passed), config tests (580 passed), focused scheduler/request-admission tests (182 passed), repo-wide ruff/format, mock engine benchmark with matching sync/async output hash, and live request-admission ramp probes showing correct ramp/cleanup behavior.

@eric-tramel eric-tramel changed the base branch from epic/645-async-scheduling to main May 20, 2026 21:03
@eric-tramel eric-tramel dismissed andreatgretel’s stale review May 20, 2026 21:03

The base branch was changed.

@eric-tramel eric-tramel requested a review from andreatgretel May 20, 2026 21:22
@eric-tramel eric-tramel merged commit c0a4dcb into main May 21, 2026
51 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants