Skip to content

feat: improve benchmark serving to work better under high concurrency#1102

Closed
cquil11 wants to merge 2 commits intomainfrom
improve-benchmark
Closed

feat: improve benchmark serving to work better under high concurrency#1102
cquil11 wants to merge 2 commits intomainfrom
improve-benchmark

Conversation

@cquil11
Copy link
Copy Markdown
Collaborator

@cquil11 cquil11 commented Apr 20, 2026

No description provided.

@cquil11 cquil11 requested a review from a team April 20, 2026 18:32
random.seed(seed + worker_index)
np.random.seed(seed + worker_index)
asyncio.run(_worker_async(shard, config, barrier, result_queue))
except BaseException:
# Must run even on exception so main doesn't hang.
try:
result_queue.put(None)
except Exception:
Comment on lines +419 to +420
barrier.wait()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Both barrier.wait() calls — in _worker_async (line 419) and run_benchmark (~line 597) — use no timeout, so if any worker subprocess crashes before reaching the barrier (e.g., via ZeroDivisionError when request_rate=0, an ImportError under the spawn context, or _build_client_session() raising), the barrier count is never satisfied and the main process hangs forever. The sentinel sent to result_queue in _worker_entry's finally block is irrelevant because main never reaches the drain loop — it is deadlocked at barrier.wait(). Fix: use barrier.wait(timeout=N) and catch BrokenBarrierError in both main and worker paths.

Extended reasoning...

What the bug is and how it manifests

The new multiprocess architecture uses a multiprocessing.Barrier sized num_client_workers + 1 to synchronize the start of all worker subprocesses with the main process. Both the worker side (_worker_async, inside the async with _build_client_session() block) and the main side (run_benchmark, just before benchmark_start_time) call barrier.wait() with no timeout argument. Python's multiprocessing.Barrier.wait() with no timeout blocks indefinitely until exactly num_client_workers + 1 parties have checked in. If any worker never checks in, all remaining parties (including main) block forever.

The specific code path that triggers it

In _worker_async, barrier.wait() is placed INSIDE async with _build_client_session(...) as session:. Several things can fail BEFORE reaching that line in a freshly spawned subprocess: (1) theta = 1.0 / (request_rate * burstiness) raises ZeroDivisionError when --request-rate 0 is passed (a user-visible flag), since rate_per_worker = 0.0 / num_workers = 0.0 and 1.0 / (0.0 * burstiness) raises immediately; (2) _build_client_session() raises due to resource exhaustion (TCPConnector opens file descriptors); (3) Under the spawn multiprocessing context, each worker is a fresh Python interpreter that must re-import all modules — an ImportError or any side-effect crash kills the process before _worker_entry even runs, meaning no sentinel is sent and the barrier is certainly never reached.

Why existing code doesn't prevent it

_worker_entry has a finally block that puts None (the sentinel) onto result_queue. This correctly unblocks the while sentinels < num_client_workers drain loop in main — but only if main ever reaches that loop. Main calls barrier.wait() unconditionally BEFORE entering the drain loop. If the barrier is never satisfied, main never exits barrier.wait(), never enters the drain loop, and the sentinel is wasted. The two mechanisms guard different code points and are not composed correctly.

What the impact would be

Any worker startup failure produces an unrecoverable, silent infinite hang with no error message or timeout. The benchmark tool simply freezes. Users have no way to distinguish a hung benchmark from a slow one, and must kill the process manually. The ZeroDivisionError path (--request-rate 0) is easily triggered by accident; the import-error path is triggered on any environment missing an optional dependency loaded in worker context.

How to fix it

Call barrier.wait(timeout=N) (a reasonable N is 60–120 seconds, or a configurable value) in both locations. Catch threading.BrokenBarrierError (which multiprocessing.Barrier raises after timeout or when the barrier is broken). In the worker, catching it allows the worker to exit cleanly and still send its sentinel. In main, catching it allows main to skip the drain loop (or drain with a timeout), log a useful error, and exit rather than hanging forever.

Step-by-step proof

  1. User runs: python benchmark_serving.py --backend vllm --model mymodel --request-rate 0 --num-client-workers 4
  2. run_benchmark computes rate_per_worker = 0.0 / 4 = 0.0 and puts it in config.
  3. Four worker subprocesses are spawned via ctx.Process(target=_worker_entry, ...).
  4. Each worker calls asyncio.run(_worker_async(...)), which immediately evaluates theta = 1.0 / (0.0 * 1.0)ZeroDivisionError.
  5. _worker_entry's except BaseException catches it, prints the traceback, and the finally block puts None to result_queue (4 sentinels total, one per worker).
  6. barrier.wait() in _worker_async is never called by any of the 4 workers.
  7. Main reaches barrier.wait() (the num_client_workers + 1 = 5th party required) and blocks indefinitely — only 0 of the required 5 parties have arrived.
  8. Main never reaches the while sentinels < num_client_workers loop.
  9. The 4 sentinels sit unconsumed in result_queue. The benchmark hangs forever.

Comment on lines +407 to +412
def flush_batch(force: bool = False) -> None:
if not batch_buffer:
return
if force or len(batch_buffer) >= _WORKER_QUEUE_BATCH_SIZE:
result_queue.put(batch_buffer.copy())
batch_buffer.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The synchronous result_queue.put() call inside flush_batch() can block the asyncio event loop when the bounded queue fills up. When blocked, no network I/O can proceed, sem.release() in _fire's finally block is delayed stalling the dispatch loop, and all pending coroutines are frozen — corrupting the timing measurements this PR aims to improve. Fix by offloading the put to a thread via loop.run_in_executor(), or use put_nowait with overflow/retry handling.

Extended reasoning...

What the bug is and how it manifests

flush_batch() is a plain synchronous function that calls result_queue.put(batch_buffer.copy()) with the default block=True semantics of multiprocessing.Queue. It is invoked directly (no await) from within the _fire() async coroutine running on the worker's asyncio event loop thread. Because asyncio is single-threaded and cooperative, any blocking call on that thread suspends the entire event loop until the call returns.

The specific code path that triggers it

_fire() awaits the HTTP request, appends the result to batch_buffer, then calls flush_batch() synchronously. Inside flush_batch(), when len(batch_buffer) >= _WORKER_QUEUE_BATCH_SIZE (64) or force=True, result_queue.put() is called. The queue is bounded at maxsize = num_client_workers * 32 (e.g. 256 slots for 8 workers). Each slot holds a batch of up to 64 outputs. At high QPS or with a slow main process, this ceiling can be reached.

Why existing code does not prevent it

The main process drains the queue in a tight loop (result_queue.get()), which ordinarily keeps the queue well below capacity. However, transient OS scheduling decisions, GC pauses, or a burst of completions from all workers simultaneously can fill the queue faster than main drains it. At that point, result_queue.put() calls into the OS-level semaphore/mutex inside the multiprocessing Queue implementation and blocks indefinitely with no timeout.

What the impact would be

While result_queue.put() is blocked: (1) the asyncio event loop is completely frozen — no HTTP response chunks can be received, no other coroutines can be scheduled; (2) the finally: sem.release() block in _fire cannot execute, so that concurrency slot is permanently held; (3) the dispatch loop awaiting sem.acquire() is also stalled, preventing new requests from being sent. This cascading delay inflates measured latency and distorts throughput figures — directly undermining the goal of this PR.

Step-by-step proof

Suppose num_client_workers=8, so maxsize=256. Workers fire requests at high QPS; with batch size 64, after 256 successful put() calls (16 384 result objects buffered), the queue is full. Worker 0 finishes another batch of 64 and calls flush_batch() -> result_queue.put(). Main is busy computing calculate_metrics or paused by GC. put() blocks on the internal OS semaphore. Worker 0's event loop is now frozen: the pending streaming HTTP responses it has open cannot deliver chunks, tasks awaiting sem.acquire() are stuck, and no new requests are dispatched. Measured TTFT and e2el for all in-flight requests inflate proportionally to the stall duration.

How to fix it

Replace the synchronous result_queue.put() with an async-safe call. The cleanest fix is: loop = asyncio.get_event_loop(); await loop.run_in_executor(None, result_queue.put, batch_buffer.copy()) — this offloads the blocking put to a thread pool worker while the event loop continues processing I/O. Alternatively, result_queue.put_nowait() with retry/back-off via asyncio.sleep() avoids the thread overhead but requires more care around queue-full handling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Development

Successfully merging this pull request may close these issues.

2 participants