diff --git a/src/agentex/lib/adk/_modules/_codex_sync.py b/src/agentex/lib/adk/_modules/_codex_sync.py index b2b162a24..5a951d57e 100644 --- a/src/agentex/lib/adk/_modules/_codex_sync.py +++ b/src/agentex/lib/adk/_modules/_codex_sync.py @@ -48,7 +48,9 @@ reasoning -> reasoning: item.started -> StreamTaskMessageStart(ReasoningContent) item.updated -> (no-op; final text arrives on completed) - item.completed -> StreamTaskMessageFull(ReasoningContent) + item.completed -> StreamTaskMessageDelta(ReasoningSummaryDelta) + + StreamTaskMessageDelta(ReasoningContentDelta) + + StreamTaskMessageDone command_execution -> tool request + response: item.started -> StreamTaskMessageStart(ToolRequestContent) + StreamTaskMessageDone @@ -96,6 +98,7 @@ from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta logger = make_logger(__name__) @@ -366,18 +369,6 @@ def _handle_item(self, evt_type: str, item: dict[str, Any]) -> list[StreamTaskMe ), ) ) - if current: - out.append( - StreamTaskMessageDelta( - type="delta", - index=idx, - delta=ReasoningContentDelta( - type="reasoning_content", - content_index=0, - content_delta=current, - ), - ) - ) elif evt_type == "item.updated": # Accumulate silently; final text arrives on item.completed. @@ -389,30 +380,53 @@ def _handle_item(self, evt_type: str, item: dict[str, Any]) -> list[StreamTaskMe if text: self.reasoning_count += 1 summary = text.strip().split("\n", 1)[0][:300] - final_content = ReasoningContent( - type="reasoning", - author="agent", - summary=[summary], - content=[text], - style="static", - ) - if idx is not None: + if idx is None: + # No started event was seen; open the message now. + idx = self._alloc() out.append( - StreamTaskMessageFull( - type="full", + StreamTaskMessageStart( + type="start", index=idx, - content=final_content, + content=ReasoningContent( + type="reasoning", + author="agent", + summary=[], + content=[], + style="active", + ), ) ) - else: - # No started event was seen; emit a standalone Full. - out.append( - StreamTaskMessageFull( - type="full", - index=self._alloc(), - content=final_content, - ) + # Deliver the reasoning as deltas, then close with a Done. + # Emitting a Full here instead would leave the open Start + # context dangling: auto_send routes Full into its own + # throwaway streaming context (ignoring the index), so the + # Start context survives until end-of-turn teardown and + # persists a second, near-empty reasoning message. Streaming + # the content as deltas lets the open context accumulate the + # final ReasoningContent and close cleanly as one message. + out.append( + StreamTaskMessageDelta( + type="delta", + index=idx, + delta=ReasoningSummaryDelta( + type="reasoning_summary", + summary_index=0, + summary_delta=summary, + ), ) + ) + out.append( + StreamTaskMessageDelta( + type="delta", + index=idx, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=0, + content_delta=text, + ), + ) + ) + out.append(StreamTaskMessageDone(type="done", index=idx)) elif idx is not None: # Empty reasoning block — still need to close with a Done. out.append(StreamTaskMessageDone(type="done", index=idx)) diff --git a/src/agentex/lib/adk/providers/_modules/sync_provider.py b/src/agentex/lib/adk/providers/_modules/sync_provider.py index 9996bf30d..d1d5e1c09 100644 --- a/src/agentex/lib/adk/providers/_modules/sync_provider.py +++ b/src/agentex/lib/adk/providers/_modules/sync_provider.py @@ -32,6 +32,7 @@ from agentex import AsyncAgentex from agentex.lib.utils.logging import make_logger from agentex.lib.core.tracing.tracer import AsyncTracer +from agentex.types.reasoning_content import ReasoningContent from agentex.types.task_message_delta import TextDelta from agentex.types.task_message_update import ( StreamTaskMessageDone, @@ -560,14 +561,20 @@ async def convert_openai_to_agentex_events(stream_response): item_id_to_index[item_id] = message_index item_id_to_type[item_id] = "reasoning_summary" - # Send a start event for this new reasoning summary message + # Send a start event for this new reasoning summary message. + # The start content must be ReasoningContent (not TextContent) + # so consumers that branch on the start event's content type + # render a reasoning/thinking indicator; the final persisted + # content is rebuilt from the reasoning deltas regardless. yield StreamTaskMessageStart( type="start", index=item_id_to_index[item_id], - content=TextContent( - type="text", + content=ReasoningContent( + type="reasoning", author="agent", - content="", # Start with empty content + summary=[], + content=[], + style="active", ), ) @@ -604,14 +611,20 @@ async def convert_openai_to_agentex_events(stream_response): item_id_to_index[item_id] = message_index item_id_to_type[item_id] = "reasoning_content" - # Send a start event for this new reasoning content message + # Send a start event for this new reasoning content message. + # The start content must be ReasoningContent (not TextContent) + # so consumers that branch on the start event's content type + # render a reasoning/thinking indicator; the final persisted + # content is rebuilt from the reasoning deltas regardless. yield StreamTaskMessageStart( type="start", index=item_id_to_index[item_id], - content=TextContent( - type="text", + content=ReasoningContent( + type="reasoning", author="agent", - content="", # Start with empty content + summary=[], + content=[], + style="active", ), ) diff --git a/tests/lib/adk/test_codex_sync.py b/tests/lib/adk/test_codex_sync.py index d0093e5dd..644688dfb 100644 --- a/tests/lib/adk/test_codex_sync.py +++ b/tests/lib/adk/test_codex_sync.py @@ -35,6 +35,7 @@ convert_codex_to_agentex_events, ) from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta async def _aiter(items: list[Any]) -> AsyncIterator[Any]: @@ -365,7 +366,15 @@ async def test_tool_indices_request_before_response(self) -> None: class TestReasoningStreaming: - async def test_reasoning_start_full(self) -> None: + async def test_reasoning_start_deltas_done(self) -> None: + """A reasoning block opens with a Start, streams the final text as + summary + content deltas, and closes with a Done. + + It must NOT emit a Full at the open Start's index: auto_send routes a + Full into a throwaway streaming context (ignoring the index), which + would leave the Start context dangling and persist a duplicate, empty + reasoning message (AGX1 codex reasoning duplicate bug). + """ events = [ {"type": "item.started", "item": {"id": "r1", "type": "reasoning", "text": ""}}, { @@ -380,31 +389,38 @@ async def test_reasoning_start_full(self) -> None: out = await _collect(convert_codex_to_agentex_events(_aiter(events))) starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] - fulls = [e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + reasoning_fulls = [ + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent) + ] + content_deltas = [ + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningContentDelta) + ] + summary_deltas = [ + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningSummaryDelta) + ] + # Exactly one message: Start + deltas + Done, all on the same index, no Full. assert len(starts) == 1 assert isinstance(starts[0].content, ReasoningContent) - assert len(fulls) == 1 - assert isinstance(fulls[0].content, ReasoningContent) - reasoning_content = fulls[0].content.content - assert reasoning_content is not None - assert any("thinking... done" in s for s in reasoning_content) - - async def test_reasoning_initial_text_emits_delta(self) -> None: - events = [ - { - "type": "item.started", - "item": {"id": "r1", "type": "reasoning", "text": "seed"}, - }, - ] - out = await _collect(convert_codex_to_agentex_events(_aiter(events))) - deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] - assert len(deltas) == 1 - assert isinstance(deltas[0].delta, ReasoningContentDelta) - assert deltas[0].delta.content_delta == "seed" - - async def test_reasoning_no_started_emits_standalone_full(self) -> None: - """If item.completed arrives without item.started, emit a standalone Full.""" + assert reasoning_fulls == [] + assert len(content_deltas) == 1 + content_delta = content_deltas[0].delta + assert isinstance(content_delta, ReasoningContentDelta) + assert content_delta.content_delta == "thinking... done" + assert len(summary_deltas) == 1 + summary_delta = summary_deltas[0].delta + assert isinstance(summary_delta, ReasoningSummaryDelta) + assert summary_delta.summary_delta == "thinking... done" + assert len(dones) == 1 + idx = starts[0].index + assert content_deltas[0].index == idx + assert summary_deltas[0].index == idx + assert dones[0].index == idx + + async def test_reasoning_no_started_opens_and_closes_one_message(self) -> None: + """If item.completed arrives without item.started, the converter opens a + Start lazily and closes it with a Done (still one clean message, no Full).""" events = [ { "type": "item.completed", @@ -412,12 +428,25 @@ async def test_reasoning_no_started_emits_standalone_full(self) -> None: } ] out = await _collect(convert_codex_to_agentex_events(_aiter(events))) - fulls = [e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)] - assert len(fulls) == 1 - assert isinstance(fulls[0].content, ReasoningContent) - orphan_content = fulls[0].content.content - assert orphan_content is not None - assert any("orphan thought" in s for s in orphan_content) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + reasoning_fulls = [ + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent) + ] + content_deltas = [ + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningContentDelta) + ] + + assert len(starts) == 1 + assert isinstance(starts[0].content, ReasoningContent) + assert reasoning_fulls == [] + assert len(content_deltas) == 1 + content_delta = content_deltas[0].delta + assert isinstance(content_delta, ReasoningContentDelta) + assert content_delta.content_delta == "orphan thought" + assert len(dones) == 1 + assert dones[0].index == starts[0].index async def test_reasoning_summary_is_first_line(self) -> None: events = [ @@ -428,9 +457,29 @@ async def test_reasoning_summary_is_first_line(self) -> None: }, ] out = await _collect(convert_codex_to_agentex_events(_aiter(events))) - full = next(e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)) - assert isinstance(full.content, ReasoningContent) - assert full.content.summary == ["line one"] + summary_event = next( + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningSummaryDelta) + ) + summary_delta = summary_event.delta + assert isinstance(summary_delta, ReasoningSummaryDelta) + assert summary_delta.summary_delta == "line one" + + async def test_reasoning_empty_block_closes_with_done_only(self) -> None: + """A reasoning block that completes with no text still closes its Start.""" + events = [ + {"type": "item.started", "item": {"id": "r3", "type": "reasoning", "text": ""}}, + {"type": "item.completed", "item": {"id": "r3", "type": "reasoning", "text": ""}}, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] + + assert len(starts) == 1 + assert deltas == [] + assert len(dones) == 1 + assert dones[0].index == starts[0].index # ---------------------------------------------------------------------------