diff --git a/src/agents/realtime/session.py b/src/agents/realtime/session.py index ca809dd9c4..887269cb2c 100644 --- a/src/agents/realtime/session.py +++ b/src/agents/realtime/session.py @@ -1149,11 +1149,15 @@ def _on_guardrail_task_done(self, task: asyncio.Task[Any]) -> None: ) ) - def _cleanup_guardrail_tasks(self) -> None: + def _cleanup_guardrail_tasks(self) -> list[asyncio.Task[Any]]: + """Cancel all pending guardrail tasks and return them for awaiting.""" + pending: list[asyncio.Task[Any]] = [] for task in self._guardrail_tasks: if not task.done(): task.cancel() + pending.append(task) self._guardrail_tasks.clear() + return pending def _enqueue_tool_call_task( self, @@ -1219,11 +1223,15 @@ def _on_tool_call_task_done(self, task: asyncio.Task[Any]) -> None: ) ) - def _cleanup_tool_call_tasks(self) -> None: + def _cleanup_tool_call_tasks(self) -> list[asyncio.Task[Any]]: + """Cancel all pending tool-call tasks and return them for awaiting.""" + pending: list[asyncio.Task[Any]] = [] for task in self._tool_call_tasks: if not task.done(): task.cancel() + pending.append(task) self._tool_call_tasks.clear() + return pending def _wake_event_iterators(self) -> None: for _ in range(self._event_iterator_waiters): @@ -1235,9 +1243,15 @@ async def _cleanup(self) -> None: self._wake_event_iterators() return - # Cancel and cleanup guardrail tasks - self._cleanup_guardrail_tasks() - self._cleanup_tool_call_tasks() + # Cancel and await all background tasks so their finally-blocks run + # before we close the model or mark the session closed. + # Using return_exceptions=True avoids raising CancelledError here and + # ensures every task fully drains even if some raise on cancellation. + pending_tasks = ( + self._cleanup_guardrail_tasks() + self._cleanup_tool_call_tasks() + ) + if pending_tasks: + await asyncio.gather(*pending_tasks, return_exceptions=True) # Remove ourselves as a listener self._model.remove_listener(self) diff --git a/tests/realtime/test_realtime_cleanup.py b/tests/realtime/test_realtime_cleanup.py new file mode 100644 index 0000000000..f72588475c --- /dev/null +++ b/tests/realtime/test_realtime_cleanup.py @@ -0,0 +1,146 @@ +""" +Regression tests for openai/openai-agents-python#3334. + +RealtimeSession._cleanup() must await cancelled background tasks so that +their finally-blocks fully run before _cleanup() returns. The old code +called task.cancel() and then immediately cleared the tracking sets without +ever awaiting the cancelled coroutines. + +Note on task lifecycle: asyncio tasks need at least one event-loop iteration +to start running. In real usage, background tasks run for some time before +_cleanup() is called, so they have already reached their first await. The +tests below use `await asyncio.sleep(0)` to simulate that running state +before triggering cleanup. +""" + +from __future__ import annotations + +import asyncio +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agents.realtime.agent import RealtimeAgent +from agents.realtime.session import RealtimeSession + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_fake_model() -> Any: + model = MagicMock() + model.add_listener = MagicMock() + model.remove_listener = MagicMock() + model.close = AsyncMock() + return model + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_cleanup_awaits_guardrail_task_finally_block() -> None: + """_cleanup() must not return until a cancelled guardrail task's finally block runs.""" + model = _make_fake_model() + session = RealtimeSession(model, RealtimeAgent(name="agent"), None) + + finally_ran = asyncio.Event() + + async def long_guardrail() -> None: + try: + await asyncio.Event().wait() # block until cancelled + finally: + finally_ran.set() + + task = asyncio.create_task(long_guardrail()) + session._guardrail_tasks.add(task) + + # Let the task reach its first await before we trigger cleanup + await asyncio.sleep(0) + + await session._cleanup() + + assert finally_ran.is_set(), ( + "_cleanup() returned before the cancelled guardrail task's finally-block ran" + ) + + +@pytest.mark.asyncio +async def test_cleanup_awaits_tool_call_task_finally_block() -> None: + """_cleanup() must not return until a cancelled tool-call task's finally block runs.""" + model = _make_fake_model() + session = RealtimeSession(model, RealtimeAgent(name="agent"), None) + + finally_ran = asyncio.Event() + + async def long_tool_call() -> None: + try: + await asyncio.Event().wait() + finally: + finally_ran.set() + + task = asyncio.create_task(long_tool_call()) + session._tool_call_tasks.add(task) + + # Let the task reach its first await + await asyncio.sleep(0) + + await session._cleanup() + + assert finally_ran.is_set(), ( + "_cleanup() returned before the cancelled tool-call task's finally-block ran" + ) + + +@pytest.mark.asyncio +async def test_cleanup_awaits_both_task_types_before_model_close() -> None: + """Both guardrail and tool-call finally-blocks must complete before model.close().""" + model = _make_fake_model() + order: list[str] = [] + + async def close_side_effect() -> None: + order.append("model_closed") + + model.close.side_effect = close_side_effect + + session = RealtimeSession(model, RealtimeAgent(name="agent"), None) + + async def make_task(label: str) -> None: + try: + await asyncio.Event().wait() + finally: + order.append(label) + + gtask = asyncio.create_task(make_task("guardrail_finally")) + ttask = asyncio.create_task(make_task("tool_call_finally")) + session._guardrail_tasks.add(gtask) + session._tool_call_tasks.add(ttask) + + # Let both tasks reach their first await + await asyncio.sleep(0) + + await session._cleanup() + + assert "guardrail_finally" in order + assert "tool_call_finally" in order + # Both finally-blocks must appear before model.close() + assert order.index("guardrail_finally") < order.index("model_closed"), ( + "guardrail finally-block ran after model.close()" + ) + assert order.index("tool_call_finally") < order.index("model_closed"), ( + "tool-call finally-block ran after model.close()" + ) + + +@pytest.mark.asyncio +async def test_cleanup_idempotent_when_no_pending_tasks() -> None: + """_cleanup() must succeed silently when there are no background tasks.""" + model = _make_fake_model() + session = RealtimeSession(model, RealtimeAgent(name="agent"), None) + + await session._cleanup() # must not raise + assert session._closed