Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 46 additions & 32 deletions src/agentex/lib/adk/_modules/_codex_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
reasoning -> reasoning:
item.started -> StreamTaskMessageStart(ReasoningContent)
item.updated -> (no-op; final text arrives on completed)
item.completed -> StreamTaskMessageFull(ReasoningContent)
item.completed -> StreamTaskMessageDelta(ReasoningSummaryDelta)
+ StreamTaskMessageDelta(ReasoningContentDelta)
+ StreamTaskMessageDone
command_execution -> tool request + response:
item.started -> StreamTaskMessageStart(ToolRequestContent)
+ StreamTaskMessageDone
Expand Down Expand Up @@ -96,6 +98,7 @@
from agentex.types.tool_request_content import ToolRequestContent
from agentex.types.tool_response_content import ToolResponseContent
from agentex.types.reasoning_content_delta import ReasoningContentDelta
from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta

logger = make_logger(__name__)

Expand Down Expand Up @@ -366,18 +369,6 @@ def _handle_item(self, evt_type: str, item: dict[str, Any]) -> list[StreamTaskMe
),
)
)
if current:
out.append(
StreamTaskMessageDelta(
type="delta",
index=idx,
delta=ReasoningContentDelta(
type="reasoning_content",
content_index=0,
content_delta=current,
),
)
)

elif evt_type == "item.updated":
# Accumulate silently; final text arrives on item.completed.
Expand All @@ -389,30 +380,53 @@ def _handle_item(self, evt_type: str, item: dict[str, Any]) -> list[StreamTaskMe
if text:
self.reasoning_count += 1
summary = text.strip().split("\n", 1)[0][:300]
final_content = ReasoningContent(
type="reasoning",
author="agent",
summary=[summary],
content=[text],
style="static",
)
if idx is not None:
if idx is None:
# No started event was seen; open the message now.
idx = self._alloc()
out.append(
StreamTaskMessageFull(
type="full",
StreamTaskMessageStart(
type="start",
index=idx,
content=final_content,
content=ReasoningContent(
type="reasoning",
author="agent",
summary=[],
content=[],
style="active",
),
)
)
else:
# No started event was seen; emit a standalone Full.
out.append(
StreamTaskMessageFull(
type="full",
index=self._alloc(),
content=final_content,
)
# Deliver the reasoning as deltas, then close with a Done.
# Emitting a Full here instead would leave the open Start
# context dangling: auto_send routes Full into its own
# throwaway streaming context (ignoring the index), so the
# Start context survives until end-of-turn teardown and
# persists a second, near-empty reasoning message. Streaming
# the content as deltas lets the open context accumulate the
# final ReasoningContent and close cleanly as one message.
out.append(
StreamTaskMessageDelta(
type="delta",
index=idx,
delta=ReasoningSummaryDelta(
type="reasoning_summary",
summary_index=0,
summary_delta=summary,
),
)
)
out.append(
StreamTaskMessageDelta(
type="delta",
index=idx,
delta=ReasoningContentDelta(
type="reasoning_content",
content_index=0,
content_delta=text,
),
)
)
out.append(StreamTaskMessageDone(type="done", index=idx))
elif idx is not None:
# Empty reasoning block β€” still need to close with a Done.
out.append(StreamTaskMessageDone(type="done", index=idx))
Expand Down
29 changes: 21 additions & 8 deletions src/agentex/lib/adk/providers/_modules/sync_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from agentex import AsyncAgentex
from agentex.lib.utils.logging import make_logger
from agentex.lib.core.tracing.tracer import AsyncTracer
from agentex.types.reasoning_content import ReasoningContent
from agentex.types.task_message_delta import TextDelta
from agentex.types.task_message_update import (
StreamTaskMessageDone,
Expand Down Expand Up @@ -560,14 +561,20 @@ async def convert_openai_to_agentex_events(stream_response):
item_id_to_index[item_id] = message_index
item_id_to_type[item_id] = "reasoning_summary"

# Send a start event for this new reasoning summary message
# Send a start event for this new reasoning summary message.
# The start content must be ReasoningContent (not TextContent)
# so consumers that branch on the start event's content type
# render a reasoning/thinking indicator; the final persisted
# content is rebuilt from the reasoning deltas regardless.
yield StreamTaskMessageStart(
type="start",
index=item_id_to_index[item_id],
content=TextContent(
type="text",
content=ReasoningContent(
type="reasoning",
author="agent",
content="", # Start with empty content
summary=[],
content=[],
style="active",
),
)

Expand Down Expand Up @@ -604,14 +611,20 @@ async def convert_openai_to_agentex_events(stream_response):
item_id_to_index[item_id] = message_index
item_id_to_type[item_id] = "reasoning_content"

# Send a start event for this new reasoning content message
# Send a start event for this new reasoning content message.
# The start content must be ReasoningContent (not TextContent)
# so consumers that branch on the start event's content type
# render a reasoning/thinking indicator; the final persisted
# content is rebuilt from the reasoning deltas regardless.
yield StreamTaskMessageStart(
type="start",
index=item_id_to_index[item_id],
content=TextContent(
type="text",
content=ReasoningContent(
type="reasoning",
author="agent",
content="", # Start with empty content
summary=[],
content=[],
style="active",
),
)

Expand Down
113 changes: 81 additions & 32 deletions tests/lib/adk/test_codex_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
convert_codex_to_agentex_events,
)
from agentex.types.reasoning_content_delta import ReasoningContentDelta
from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta


async def _aiter(items: list[Any]) -> AsyncIterator[Any]:
Expand Down Expand Up @@ -365,7 +366,15 @@ async def test_tool_indices_request_before_response(self) -> None:


class TestReasoningStreaming:
async def test_reasoning_start_full(self) -> None:
async def test_reasoning_start_deltas_done(self) -> None:
"""A reasoning block opens with a Start, streams the final text as
summary + content deltas, and closes with a Done.

It must NOT emit a Full at the open Start's index: auto_send routes a
Full into a throwaway streaming context (ignoring the index), which
would leave the Start context dangling and persist a duplicate, empty
reasoning message (AGX1 codex reasoning duplicate bug).
"""
events = [
{"type": "item.started", "item": {"id": "r1", "type": "reasoning", "text": ""}},
{
Expand All @@ -380,44 +389,64 @@ async def test_reasoning_start_full(self) -> None:
out = await _collect(convert_codex_to_agentex_events(_aiter(events)))

starts = [e for e in out if isinstance(e, StreamTaskMessageStart)]
fulls = [e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)]
dones = [e for e in out if isinstance(e, StreamTaskMessageDone)]
reasoning_fulls = [
e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)
]
content_deltas = [
e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningContentDelta)
]
summary_deltas = [
e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningSummaryDelta)
]

# Exactly one message: Start + deltas + Done, all on the same index, no Full.
assert len(starts) == 1
assert isinstance(starts[0].content, ReasoningContent)
assert len(fulls) == 1
assert isinstance(fulls[0].content, ReasoningContent)
reasoning_content = fulls[0].content.content
assert reasoning_content is not None
assert any("thinking... done" in s for s in reasoning_content)

async def test_reasoning_initial_text_emits_delta(self) -> None:
events = [
{
"type": "item.started",
"item": {"id": "r1", "type": "reasoning", "text": "seed"},
},
]
out = await _collect(convert_codex_to_agentex_events(_aiter(events)))
deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)]
assert len(deltas) == 1
assert isinstance(deltas[0].delta, ReasoningContentDelta)
assert deltas[0].delta.content_delta == "seed"

async def test_reasoning_no_started_emits_standalone_full(self) -> None:
"""If item.completed arrives without item.started, emit a standalone Full."""
assert reasoning_fulls == []
assert len(content_deltas) == 1
content_delta = content_deltas[0].delta
assert isinstance(content_delta, ReasoningContentDelta)
assert content_delta.content_delta == "thinking... done"
assert len(summary_deltas) == 1
summary_delta = summary_deltas[0].delta
assert isinstance(summary_delta, ReasoningSummaryDelta)
assert summary_delta.summary_delta == "thinking... done"
assert len(dones) == 1
idx = starts[0].index
assert content_deltas[0].index == idx
assert summary_deltas[0].index == idx
assert dones[0].index == idx

async def test_reasoning_no_started_opens_and_closes_one_message(self) -> None:
"""If item.completed arrives without item.started, the converter opens a
Start lazily and closes it with a Done (still one clean message, no Full)."""
events = [
{
"type": "item.completed",
"item": {"id": "r_orphan", "type": "reasoning", "text": "orphan thought"},
}
]
out = await _collect(convert_codex_to_agentex_events(_aiter(events)))
fulls = [e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)]
assert len(fulls) == 1
assert isinstance(fulls[0].content, ReasoningContent)
orphan_content = fulls[0].content.content
assert orphan_content is not None
assert any("orphan thought" in s for s in orphan_content)

starts = [e for e in out if isinstance(e, StreamTaskMessageStart)]
dones = [e for e in out if isinstance(e, StreamTaskMessageDone)]
reasoning_fulls = [
e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent)
]
content_deltas = [
e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningContentDelta)
]

assert len(starts) == 1
assert isinstance(starts[0].content, ReasoningContent)
assert reasoning_fulls == []
assert len(content_deltas) == 1
content_delta = content_deltas[0].delta
assert isinstance(content_delta, ReasoningContentDelta)
assert content_delta.content_delta == "orphan thought"
assert len(dones) == 1
assert dones[0].index == starts[0].index
Comment on lines +441 to +449

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Lazy-open path missing ReasoningSummaryDelta assertion

The production code for the idx is None case always emits a ReasoningSummaryDelta before the ReasoningContentDelta (lines 407–416 of _codex_sync.py). This test only asserts on content_deltas and ignores summary_deltas, so a future regression that accidentally drops the summary delta on this path would go undetected. Adding the same summary_deltas assertions that test_reasoning_start_deltas_done carries would close the gap.

Prompt To Fix With AI
This is a comment left during a code review.
Path: tests/lib/adk/test_codex_sync.py
Line: 441-449

Comment:
**Lazy-open path missing `ReasoningSummaryDelta` assertion**

The production code for the `idx is None` case always emits a `ReasoningSummaryDelta` before the `ReasoningContentDelta` (lines 407–416 of `_codex_sync.py`). This test only asserts on `content_deltas` and ignores `summary_deltas`, so a future regression that accidentally drops the summary delta on this path would go undetected. Adding the same `summary_deltas` assertions that `test_reasoning_start_deltas_done` carries would close the gap.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Fix in Claude Code


async def test_reasoning_summary_is_first_line(self) -> None:
events = [
Expand All @@ -428,9 +457,29 @@ async def test_reasoning_summary_is_first_line(self) -> None:
},
]
out = await _collect(convert_codex_to_agentex_events(_aiter(events)))
full = next(e for e in out if isinstance(e, StreamTaskMessageFull) and isinstance(e.content, ReasoningContent))
assert isinstance(full.content, ReasoningContent)
assert full.content.summary == ["line one"]
summary_event = next(
e for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, ReasoningSummaryDelta)
)
summary_delta = summary_event.delta
assert isinstance(summary_delta, ReasoningSummaryDelta)
assert summary_delta.summary_delta == "line one"

async def test_reasoning_empty_block_closes_with_done_only(self) -> None:
"""A reasoning block that completes with no text still closes its Start."""
events = [
{"type": "item.started", "item": {"id": "r3", "type": "reasoning", "text": ""}},
{"type": "item.completed", "item": {"id": "r3", "type": "reasoning", "text": ""}},
]
out = await _collect(convert_codex_to_agentex_events(_aiter(events)))

starts = [e for e in out if isinstance(e, StreamTaskMessageStart)]
dones = [e for e in out if isinstance(e, StreamTaskMessageDone)]
deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta)]

assert len(starts) == 1
assert deltas == []
assert len(dones) == 1
assert dones[0].index == starts[0].index


# ---------------------------------------------------------------------------
Expand Down
Loading