From 7bcca1088f33aae5c1b65ba42196ec439e575843 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 17:45:07 -0400 Subject: [PATCH 1/3] fix(codex): close reasoning with Done deltas instead of Full to avoid duplicate message Codex reasoning emitted Start(active) then Full(static) at the open index with no Done. auto_send routes a Full into its own throwaway streaming context (ignoring the index), so the Start context survived until end-of-turn teardown and persisted a second, near-empty reasoning message (user-visible duplicate + out-of-order). Mirror the claude_code pattern: stream the final reasoning as summary + content deltas on the open index, then close with a Done, so the open context accumulates the final ReasoningContent and closes cleanly as one message. The no-started case opens the Start lazily and closes it the same way. Updated tests assert the Start + deltas + Done sequence and that no Full(ReasoningContent) is emitted. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/adk/_modules/_codex_sync.py | 78 +++++++++------ tests/lib/adk/test_codex_sync.py | 105 ++++++++++++++------ 2 files changed, 119 insertions(+), 64 deletions(-) diff --git a/src/agentex/lib/adk/_modules/_codex_sync.py b/src/agentex/lib/adk/_modules/_codex_sync.py index b2b162a24..5a951d57e 100644 --- a/src/agentex/lib/adk/_modules/_codex_sync.py +++ b/src/agentex/lib/adk/_modules/_codex_sync.py @@ -48,7 +48,9 @@ reasoning -> reasoning: item.started -> StreamTaskMessageStart(ReasoningContent) item.updated -> (no-op; final text arrives on completed) - item.completed -> StreamTaskMessageFull(ReasoningContent) + item.completed -> StreamTaskMessageDelta(ReasoningSummaryDelta) + + StreamTaskMessageDelta(ReasoningContentDelta) + + StreamTaskMessageDone command_execution -> tool request + response: item.started -> StreamTaskMessageStart(ToolRequestContent) + StreamTaskMessageDone @@ -96,6 +98,7 @@ from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta logger = make_logger(__name__) @@ -366,18 +369,6 @@ def _handle_item(self, evt_type: str, item: dict[str, Any]) -> list[StreamTaskMe ), ) ) - if current: - out.append( - StreamTaskMessageDelta( - type="delta", - index=idx, - delta=ReasoningContentDelta( - type="reasoning_content", - content_index=0, - content_delta=current, - ), - ) - ) elif evt_type == "item.updated": # Accumulate silently; final text arrives on item.completed. @@ -389,30 +380,53 @@ def _handle_item(self, evt_type: str, item: dict[str, Any]) -> list[StreamTaskMe if text: self.reasoning_count += 1 summary = text.strip().split("\n", 1)[0][:300] - final_content = ReasoningContent( - type="reasoning", - author="agent", - summary=[summary], - content=[text], - style="static", - ) - if idx is not None: + if idx is None: + # No started event was seen; open the message now. + idx = self._alloc() out.append( - StreamTaskMessageFull( - type="full", + StreamTaskMessageStart( + type="start", index=idx, - content=final_content, + content=ReasoningContent( + type="reasoning", + author="agent", + summary=[], + content=[], + style="active", + ), ) ) - else: - # No started event was seen; emit a standalone Full. - out.append( - StreamTaskMessageFull( - type="full", - index=self._alloc(), - content=final_content, - ) + # Deliver the reasoning as deltas, then close with a Done. + # Emitting a Full here instead would leave the open Start + # context dangling: auto_send routes Full into its own + # throwaway streaming context (ignoring the index), so the + # Start context survives until end-of-turn teardown and + # persists a second, near-empty reasoning message. Streaming + # the content as deltas lets the open context accumulate the + # final ReasoningContent and close cleanly as one message. + out.append( + StreamTaskMessageDelta( + type="delta", + index=idx, + delta=ReasoningSummaryDelta( + type="reasoning_summary", + summary_index=0, + summary_delta=summary, + ), ) + ) + out.append( + StreamTaskMessageDelta( + type="delta", + index=idx, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=0, + content_delta=text, + ), + ) + ) + out.append(StreamTaskMessageDone(type="done", index=idx)) elif idx is not None: # Empty reasoning block — still need to close with a Done. out.append(StreamTaskMessageDone(type="done", index=idx)) diff --git a/tests/lib/adk/test_codex_sync.py b/tests/lib/adk/test_codex_sync.py index d0093e5dd..6e30c82ab 100644 --- a/tests/lib/adk/test_codex_sync.py +++ b/tests/lib/adk/test_codex_sync.py @@ -35,6 +35,7 @@ convert_codex_to_agentex_events, ) from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta async def _aiter(items: list[Any]) -> AsyncIterator[Any]: @@ -365,7 +366,15 @@ async def test_tool_indices_request_before_response(self) -> None: class TestReasoningStreaming: - async def test_reasoning_start_full(self) -> None: + async def test_reasoning_start_deltas_done(self) -> None: + """A reasoning block opens with a Start, streams the final text as + summary + content deltas, and closes with a Done. + + It must NOT emit a Full at the open Start's index: auto_send routes a + Full into a throwaway streaming context (ignoring the index), which + would leave the Start context dangling and persist a duplicate, empty + reasoning message (AGX1 codex reasoning duplicate bug). + """ events = [ {"type": "item.started", "item": {"id": "r1", "type": "reasoning", "text": ""}}, { @@ -380,31 +389,34 @@ async def test_reasoning_start_full(self) -> None: out = await _collect(convert_codex_to_agentex_events(_aiter(events))) starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] - fulls = [e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + reasoning_fulls = [ + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent) + ] + content_deltas = [ + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningContentDelta) + ] + summary_deltas = [ + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningSummaryDelta) + ] + # Exactly one message: Start + deltas + Done, all on the same index, no Full. assert len(starts) == 1 assert isinstance(starts[0].content, ReasoningContent) - assert len(fulls) == 1 - assert isinstance(fulls[0].content, ReasoningContent) - reasoning_content = fulls[0].content.content - assert reasoning_content is not None - assert any("thinking... done" in s for s in reasoning_content) - - async def test_reasoning_initial_text_emits_delta(self) -> None: - events = [ - { - "type": "item.started", - "item": {"id": "r1", "type": "reasoning", "text": "seed"}, - }, - ] - out = await _collect(convert_codex_to_agentex_events(_aiter(events))) - deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] - assert len(deltas) == 1 - assert isinstance(deltas[0].delta, ReasoningContentDelta) - assert deltas[0].delta.content_delta == "seed" - - async def test_reasoning_no_started_emits_standalone_full(self) -> None: - """If item.completed arrives without item.started, emit a standalone Full.""" + assert reasoning_fulls == [] + assert len(content_deltas) == 1 + assert content_deltas[0].delta.content_delta == "thinking... done" + assert len(summary_deltas) == 1 + assert summary_deltas[0].delta.summary_delta == "thinking... done" + assert len(dones) == 1 + idx = starts[0].index + assert content_deltas[0].index == idx + assert summary_deltas[0].index == idx + assert dones[0].index == idx + + async def test_reasoning_no_started_opens_and_closes_one_message(self) -> None: + """If item.completed arrives without item.started, the converter opens a + Start lazily and closes it with a Done (still one clean message, no Full).""" events = [ { "type": "item.completed", @@ -412,12 +424,23 @@ async def test_reasoning_no_started_emits_standalone_full(self) -> None: } ] out = await _collect(convert_codex_to_agentex_events(_aiter(events))) - fulls = [e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)] - assert len(fulls) == 1 - assert isinstance(fulls[0].content, ReasoningContent) - orphan_content = fulls[0].content.content - assert orphan_content is not None - assert any("orphan thought" in s for s in orphan_content) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + reasoning_fulls = [ + e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent) + ] + content_deltas = [ + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningContentDelta) + ] + + assert len(starts) == 1 + assert isinstance(starts[0].content, ReasoningContent) + assert reasoning_fulls == [] + assert len(content_deltas) == 1 + assert content_deltas[0].delta.content_delta == "orphan thought" + assert len(dones) == 1 + assert dones[0].index == starts[0].index async def test_reasoning_summary_is_first_line(self) -> None: events = [ @@ -428,9 +451,27 @@ async def test_reasoning_summary_is_first_line(self) -> None: }, ] out = await _collect(convert_codex_to_agentex_events(_aiter(events))) - full = next(e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)) - assert isinstance(full.content, ReasoningContent) - assert full.content.summary == ["line one"] + summary_delta = next( + e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningSummaryDelta) + ) + assert summary_delta.delta.summary_delta == "line one" + + async def test_reasoning_empty_block_closes_with_done_only(self) -> None: + """A reasoning block that completes with no text still closes its Start.""" + events = [ + {"type": "item.started", "item": {"id": "r3", "type": "reasoning", "text": ""}}, + {"type": "item.completed", "item": {"id": "r3", "type": "reasoning", "text": ""}}, + ] + out = await _collect(convert_codex_to_agentex_events(_aiter(events))) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)] + + assert len(starts) == 1 + assert deltas == [] + assert len(dones) == 1 + assert dones[0].index == starts[0].index # --------------------------------------------------------------------------- From fc90f093402901239927b334ddd97d609dfa6793 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 17:45:07 -0400 Subject: [PATCH 2/3] fix(openai): use ReasoningContent for reasoning stream Start events convert_openai_to_agentex_events opened reasoning summary/content messages with a TextContent Start. On the migrated auto_send/Temporal path this regressed the prior behavior (which started reasoning with ReasoningContent), so consumers branching on the start event's content type render reasoning as plain text. The final persisted content is rebuilt from the reasoning deltas regardless, so this only affects the live stream envelope. Aligns with the codex/claude_code taps and the langgraph-sync converter, and matches what the openai conformance suite already treats as canonical. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../adk/providers/_modules/sync_provider.py | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/agentex/lib/adk/providers/_modules/sync_provider.py b/src/agentex/lib/adk/providers/_modules/sync_provider.py index 9996bf30d..d1d5e1c09 100644 --- a/src/agentex/lib/adk/providers/_modules/sync_provider.py +++ b/src/agentex/lib/adk/providers/_modules/sync_provider.py @@ -32,6 +32,7 @@ from agentex import AsyncAgentex from agentex.lib.utils.logging import make_logger from agentex.lib.core.tracing.tracer import AsyncTracer +from agentex.types.reasoning_content import ReasoningContent from agentex.types.task_message_delta import TextDelta from agentex.types.task_message_update import ( StreamTaskMessageDone, @@ -560,14 +561,20 @@ async def convert_openai_to_agentex_events(stream_response): item_id_to_index[item_id] = message_index item_id_to_type[item_id] = "reasoning_summary" - # Send a start event for this new reasoning summary message + # Send a start event for this new reasoning summary message. + # The start content must be ReasoningContent (not TextContent) + # so consumers that branch on the start event's content type + # render a reasoning/thinking indicator; the final persisted + # content is rebuilt from the reasoning deltas regardless. yield StreamTaskMessageStart( type="start", index=item_id_to_index[item_id], - content=TextContent( - type="text", + content=ReasoningContent( + type="reasoning", author="agent", - content="", # Start with empty content + summary=[], + content=[], + style="active", ), ) @@ -604,14 +611,20 @@ async def convert_openai_to_agentex_events(stream_response): item_id_to_index[item_id] = message_index item_id_to_type[item_id] = "reasoning_content" - # Send a start event for this new reasoning content message + # Send a start event for this new reasoning content message. + # The start content must be ReasoningContent (not TextContent) + # so consumers that branch on the start event's content type + # render a reasoning/thinking indicator; the final persisted + # content is rebuilt from the reasoning deltas regardless. yield StreamTaskMessageStart( type="start", index=item_id_to_index[item_id], - content=TextContent( - type="text", + content=ReasoningContent( + type="reasoning", author="agent", - content="", # Start with empty content + summary=[], + content=[], + style="active", ), ) From 8540bc8155edbeba1d0c6a677c08fc62dc6cbf5d Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 17:51:29 -0400 Subject: [PATCH 3/3] fix(tests): narrow reasoning delta types for pyright in codex reasoning tests pyright does not narrow the StreamTaskMessageDelta.delta union through an isinstance filter inside a list comprehension, so accessing content_delta / summary_delta on the collected elements failed. Bind each delta to a local and assert isinstance before accessing the typed attribute. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/lib/adk/test_codex_sync.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/tests/lib/adk/test_codex_sync.py b/tests/lib/adk/test_codex_sync.py index 6e30c82ab..644688dfb 100644 --- a/tests/lib/adk/test_codex_sync.py +++ b/tests/lib/adk/test_codex_sync.py @@ -405,9 +405,13 @@ async def test_reasoning_start_deltas_done(self) -> None: assert isinstance(starts[0].content, ReasoningContent) assert reasoning_fulls == [] assert len(content_deltas) == 1 - assert content_deltas[0].delta.content_delta == "thinking... done" + content_delta = content_deltas[0].delta + assert isinstance(content_delta, ReasoningContentDelta) + assert content_delta.content_delta == "thinking... done" assert len(summary_deltas) == 1 - assert summary_deltas[0].delta.summary_delta == "thinking... done" + summary_delta = summary_deltas[0].delta + assert isinstance(summary_delta, ReasoningSummaryDelta) + assert summary_delta.summary_delta == "thinking... done" assert len(dones) == 1 idx = starts[0].index assert content_deltas[0].index == idx @@ -438,7 +442,9 @@ async def test_reasoning_no_started_opens_and_closes_one_message(self) -> None: assert isinstance(starts[0].content, ReasoningContent) assert reasoning_fulls == [] assert len(content_deltas) == 1 - assert content_deltas[0].delta.content_delta == "orphan thought" + content_delta = content_deltas[0].delta + assert isinstance(content_delta, ReasoningContentDelta) + assert content_delta.content_delta == "orphan thought" assert len(dones) == 1 assert dones[0].index == starts[0].index @@ -451,10 +457,12 @@ async def test_reasoning_summary_is_first_line(self) -> None: }, ] out = await _collect(convert_codex_to_agentex_events(_aiter(events))) - summary_delta = next( + summary_event = next( e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningSummaryDelta) ) - assert summary_delta.delta.summary_delta == "line one" + summary_delta = summary_event.delta + assert isinstance(summary_delta, ReasoningSummaryDelta) + assert summary_delta.summary_delta == "line one" async def test_reasoning_empty_block_closes_with_done_only(self) -> None: """A reasoning block that completes with no text still closes its Start."""