fix(aws): wait for stream ready before sending audio start event#5626
fix(aws): wait for stream ready before sending audio start event#5626theomonnom merged 1 commit intolivekit:mainfrom
Conversation
|
|
| # before sending audio_content_start_event. Without this, under load | ||
| # Bedrock may not have finished processing chat history, causing: | ||
| # ValidationException: "Chat history should be sent completely before streaming audio." | ||
| await self._stream_ready.wait() |
There was a problem hiding this comment.
π‘ _process_audio_input hangs indefinitely if _process_responses fails before setting _stream_ready
The new await self._stream_ready.wait() at line 1924 introduces a dependency on _process_responses successfully reaching self._stream_ready.set() at line 1492. If _process_responses fails before that point (e.g., await_output() at line 1489 throws due to a network error, throttling, or the assertion at line 1487 fails), the finally block at realtime_model.py:1638-1640 only clears _is_sess_active but never sets _stream_ready. This leaves _process_audio_input stuck on _stream_ready.wait() forever.
In the old code, _process_audio_input did not wait on _stream_ready before its main logic β it sent audio_content_start immediately and then waited on _is_sess_active.wait(). When _process_responses failed and cleared _is_sess_active, the audio input task could exit its while loop. The new code removes this clean exit path for the early-failure scenario. The task will only be freed when aclose() or _graceful_session_recycle externally cancels it.
Prompt for agents
The _process_responses method's finally block (around line 1638-1640) clears _is_sess_active but does not set _stream_ready. This means if _process_responses fails before reaching self._stream_ready.set() (line 1492), the _process_audio_input task will hang indefinitely on await self._stream_ready.wait() at line 1924.
To fix this, ensure _stream_ready is set in the finally block of _process_responses so that _process_audio_input can proceed and then exit via the _is_sess_active.is_set() check in its while loop. For example, adding self._stream_ready.set() in the finally block at line 1638-1640 would unblock _process_audio_input, which would then see _is_sess_active is cleared and exit the while loop normally.
Alternatively, use asyncio.wait_for or wrap the _stream_ready.wait() with a concurrent check on _is_sess_active so the task can exit if the session becomes inactive while waiting.
Was this helpful? React with π or π to provide feedback.
Problem
Under load, _process_audio_input sends audio_content_start_event before Bedrock confirms it has finished processing chat history. This causes:
The root cause is a race condition in initialize_streams: the _process_audio_input task was created before _process_responses, and immediately fired off the audio start event without waiting for the stream to be established.
Changes
Testing
No behavior change on the happy path. The fix only adds a gate that ensures the stream is ready before audio begins, which was already the implicit assumption.