Skip to content

Commit 7b7dc66

Browse files
committed
feat: improve CoalescingBuffer to park on idle, reducing CPU usage
1 parent f312fef commit 7b7dc66

2 files changed

Lines changed: 94 additions & 10 deletions

File tree

src/agentex/lib/core/services/adk/streaming.py

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,12 @@ def __init__(self, on_flush: Callable[[StreamTaskMessageDelta], Awaitable[object
166166
self._first_flushed = False
167167
self._closed = False
168168
self._lock = asyncio.Lock()
169-
self._flush_signal = asyncio.Event()
169+
# Two events so the ticker can park at zero CPU when idle:
170+
# _wake — buffer went empty -> non-empty; the ticker should run
171+
# _flush_now — flush immediately (first delta / size threshold / close),
172+
# bypassing the coalescing window
173+
self._wake = asyncio.Event()
174+
self._flush_now = asyncio.Event()
170175
self._task: asyncio.Task[None] | None = None
171176

172177
def start(self) -> None:
@@ -177,22 +182,42 @@ async def add(self, update: StreamTaskMessageDelta) -> None:
177182
if self._closed:
178183
return
179184
async with self._lock:
185+
was_empty = not self._buf
180186
self._buf.append(update)
181187
self._buf_chars += _delta_char_len(update.delta)
182188
if not self._first_flushed or self._buf_chars >= self.MAX_BUFFERED_CHARS:
183189
self._first_flushed = True
184-
self._flush_signal.set()
190+
self._flush_now.set()
191+
# Wake the (possibly parked) ticker when the buffer goes from empty
192+
# to non-empty; it then applies the coalescing window itself.
193+
if was_empty:
194+
self._wake.set()
185195

186196
async def _run(self) -> None:
187197
try:
188198
while True:
189-
try:
190-
await asyncio.wait_for(self._flush_signal.wait(), timeout=self.FLUSH_INTERVAL_S)
191-
except asyncio.TimeoutError:
192-
pass
199+
# Park at zero CPU until there is data to flush (or close()).
200+
# This is the key change from a fixed-interval ticker: an idle
201+
# or orphaned buffer blocks here instead of waking every
202+
# FLUSH_INTERVAL_S forever — the latter leaked CPU when a buffer
203+
# outlived its stream without close() running (one spinning task
204+
# per such stream).
205+
await self._wake.wait()
206+
self._wake.clear()
207+
# First delta / size threshold / close flush immediately;
208+
# otherwise coalesce for up to FLUSH_INTERVAL_S so consecutive
209+
# deltas batch into a single publish.
210+
if not self._flush_now.is_set() and not self._closed:
211+
try:
212+
await asyncio.wait_for(self._flush_now.wait(), timeout=self.FLUSH_INTERVAL_S)
213+
except asyncio.TimeoutError:
214+
pass
193215
async with self._lock:
194-
self._flush_signal.clear()
216+
self._flush_now.clear()
195217
drained = self._drain_locked()
218+
# Data that arrived during the flush keeps the ticker running.
219+
if self._buf:
220+
self._wake.set()
196221
for u in drained:
197222
try:
198223
await self._on_flush(u)
@@ -215,12 +240,17 @@ async def close(self) -> None:
215240
# producing the duplicate-tail symptom seen on the UI stream.
216241
self._closed = True
217242
if self._task is not None:
218-
self._flush_signal.set()
243+
# Wake the parked ticker so it sees _closed and exits after its
244+
# next drain.
245+
self._wake.set()
246+
self._flush_now.set()
219247
try:
220248
await self._task
221249
except asyncio.CancelledError:
222-
# Propagate if our caller is being cancelled; the task itself
223-
# swallows CancelledError so this only fires on outer cancel.
250+
# Our caller is being cancelled. Force-cancel the ticker so it
251+
# can never be orphaned into a parked/looping task, then
252+
# propagate the cancellation.
253+
self._task.cancel()
224254
raise
225255
self._task = None
226256
async with self._lock:

tests/lib/core/services/adk/test_streaming.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,60 @@ async def on_flush(u: StreamTaskMessageDelta) -> None:
303303
await buf.close()
304304

305305

306+
class TestCoalescingBufferIdleParks:
307+
"""Regression: the ticker must park (block on its wake event) when there is
308+
no buffered data, instead of waking every FLUSH_INTERVAL_S. The old
309+
fixed-interval ticker spun at 1/FLUSH_INTERVAL forever, so a buffer that
310+
outlived its stream (orphaned, close() not run) pinned worker CPU — one
311+
spinning task per such stream.
312+
"""
313+
314+
@staticmethod
315+
def _count_drains(buf: CoalescingBuffer) -> list[int]:
316+
"""Instrument _drain_locked to count ticker wake/drain cycles."""
317+
n = [0]
318+
orig = buf._drain_locked
319+
320+
def counting() -> list[StreamTaskMessageDelta]:
321+
n[0] += 1
322+
return orig()
323+
324+
buf._drain_locked = counting # type: ignore[method-assign]
325+
return n
326+
327+
@pytest.mark.asyncio
328+
async def test_idle_buffer_does_not_spin(self) -> None:
329+
"""With no data ever added, the ticker must not drain at all over many
330+
FLUSH_INTERVAL_S windows."""
331+
buf = CoalescingBuffer(on_flush=AsyncMock())
332+
drains = self._count_drains(buf)
333+
buf.start()
334+
try:
335+
# ~8 windows at FLUSH_INTERVAL_S=0.050; a polling ticker would have
336+
# woken ~8 times. A parked ticker drains 0 times.
337+
await asyncio.sleep(0.4)
338+
assert drains[0] == 0, f"idle ticker woke {drains[0]}x (must park at 0)"
339+
finally:
340+
await buf.close()
341+
342+
@pytest.mark.asyncio
343+
async def test_orphaned_buffer_parks_after_flush(self, task_message: TaskMessage) -> None:
344+
"""A buffer whose close() never runs (orphaned on an abnormal stream
345+
exit) must still park at zero CPU once its data is drained — not spin.
346+
This is the exact condition that previously leaked worker CPU."""
347+
buf = CoalescingBuffer(on_flush=AsyncMock())
348+
buf.start()
349+
try:
350+
await buf.add(_text(task_message, "hi")) # one immediate flush
351+
await asyncio.sleep(0.020) # let it flush and park
352+
drains = self._count_drains(buf)
353+
# Deliberately do NOT close — simulate an orphaned buffer.
354+
await asyncio.sleep(0.4)
355+
assert drains[0] == 0, f"orphaned ticker woke {drains[0]}x (must park at 0)"
356+
finally:
357+
await buf.close() # cleanup only
358+
359+
306360
class TestCoalescingBufferClose:
307361
@pytest.mark.asyncio
308362
async def test_close_drains_remaining_buffered_items(self, task_message: TaskMessage) -> None:

0 commit comments

Comments
 (0)