Skip to content

Commit 21e8269

Browse files
declan-scaleclaude
andcommitted
fix(claude-code): stop dropping later-turn text/thinking blocks [greptile]
_saw_text_stream / _saw_thinking_stream were turn-wide latches: once any block was streamed via stream_event deltas, every later assistant text/thinking block that was NOT streamed got skipped (model output silently dropped). Use only the precise per-block signal (idx in _streamed_block_indexes) and reset that set (and the thinking once-guard) at each materialised-message boundary, so an earlier turn's block index can't linger and drop a later turn's block at the same index. Drop the now-dead _saw_text_stream latch. Add a regression test for a non-streamed text block in a later turn. Also (P2) replace asyncio.run() at module import in the claude-code conformance test with a loop-free driver: the fixture conversion only iterates in-memory envelopes, so it never suspends on real I/O. asyncio.run() at import raises when a loop is already running (programmatic pytest, Jupyter, session-scoped loops); the manual driver is unaffected by ambient loop state. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent ba51b5b commit 21e8269

3 files changed

Lines changed: 62 additions & 15 deletions

File tree

src/agentex/lib/adk/_modules/_claude_code_sync.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,11 @@ async def convert_claude_code_to_agentex_events(
100100
_text_index: int | None = None
101101
# Track which assistant-message block indices were already streamed via
102102
# stream_event triples. Those blocks must not be re-emitted when the full
103-
# assistant message arrives.
103+
# assistant message arrives. Reset at each message boundary (see below) so a
104+
# later turn's block indices don't collide with an earlier turn's.
104105
_streamed_block_indexes: set[int] = set()
105-
_saw_text_stream = False
106+
# Once-guard so a thinking block's pending index is claimed on its first
107+
# thinking_delta only. Reset per turn alongside _streamed_block_indexes.
106108
_saw_thinking_stream = False
107109
# For deferred ReasoningStarted: if a content_block_start(thinking) arrives
108110
# but no thinking_delta ever follows, the final assistant block's thinking
@@ -142,8 +144,9 @@ async def convert_claude_code_to_agentex_events(
142144
block_type = block.get("type", "")
143145

144146
if block_type == "text":
145-
# Skip if already delivered via stream_event deltas
146-
if _saw_text_stream or idx in _streamed_block_indexes:
147+
# Skip only the specific blocks already delivered via
148+
# stream_event deltas (per-block, not a turn-wide latch).
149+
if idx in _streamed_block_indexes:
147150
continue
148151
text = block.get("text", "")
149152
if text:
@@ -166,8 +169,9 @@ async def convert_claude_code_to_agentex_events(
166169
yield StreamTaskMessageDone(type="done", index=msg_index)
167170

168171
elif block_type == "thinking":
169-
# Skip if already delivered via stream_event deltas
170-
if _saw_thinking_stream or idx in _streamed_block_indexes:
172+
# Skip only the specific blocks already delivered via
173+
# stream_event deltas (per-block, not a turn-wide latch).
174+
if idx in _streamed_block_indexes:
171175
continue
172176
thinking_text = block.get("thinking", "")
173177
if thinking_text:
@@ -239,6 +243,13 @@ async def convert_claude_code_to_agentex_events(
239243
),
240244
)
241245

246+
# End of a materialised message: reset per-turn streaming dedup state
247+
# so the next turn's stream_event indices start clean. Without this,
248+
# a block index streamed in an earlier turn would linger in the set
249+
# and silently drop a later turn's non-streamed block at that index.
250+
_streamed_block_indexes = set()
251+
_saw_thinking_stream = False
252+
242253
# -----------------------------------------------------------------------
243254
# stream_event — incremental streaming deltas
244255
# -----------------------------------------------------------------------
@@ -277,7 +288,6 @@ async def convert_claude_code_to_agentex_events(
277288
elif btype == "text":
278289
_text_open = True
279290
_text_buf = ""
280-
_saw_text_stream = True
281291
if isinstance(block_index, int):
282292
_streamed_block_indexes.add(block_index)
283293
msg_index = next_index

tests/lib/adk/test_claude_code_sync.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,30 @@ async def test_streamed_text_not_re_emitted_by_assistant_block(self):
140140
text_starts = [e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, TextContent)]
141141
assert len(text_starts) == 1, "Text block must not be emitted twice"
142142

143+
async def test_later_turn_non_streamed_text_not_dropped(self):
144+
"""A non-streamed text block in a later turn must not be dropped because an
145+
earlier turn streamed a block at the same index."""
146+
envelopes = [
147+
# Turn 1: streamed text at index 0 (dedup'd against the materialised msg).
148+
{
149+
"type": "stream_event",
150+
"event": {"type": "content_block_start", "index": 0, "content_block": {"type": "text"}},
151+
},
152+
{
153+
"type": "stream_event",
154+
"event": {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "first"}},
155+
},
156+
{"type": "stream_event", "event": {"type": "content_block_stop", "index": 0}},
157+
{"type": "assistant", "message": {"content": [{"type": "text", "text": "first"}]}},
158+
# Turn 2: a NON-streamed text block, also at index 0.
159+
{"type": "assistant", "message": {"content": [{"type": "text", "text": "second"}]}},
160+
]
161+
out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes)))
162+
deltas = [
163+
e.delta.text_delta for e in out if isinstance(e, StreamTaskMessageDelta) and isinstance(e.delta, TextDelta)
164+
]
165+
assert deltas == ["first", "second"], "Later turn's non-streamed text must still be delivered"
166+
143167

144168
# ---------------------------------------------------------------------------
145169
# Thinking / reasoning content

tests/lib/core/harness/conformance/test_claude_code_conformance.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
from __future__ import annotations
3131

32+
from typing import Any
33+
3234
import pytest
3335

3436
from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events
@@ -151,16 +153,27 @@ async def _build_fixture(name: str, envelopes: list[dict]) -> Fixture:
151153
return Fixture(name=name, events=events)
152154

153155

154-
# We need to run the async conversion synchronously at module import time so
155-
# that fixtures are ready before pytest collects. Use asyncio.run() carefully —
156-
# this is safe in a test module since no event loop is running at import time.
157-
import asyncio as _asyncio
156+
# Fixtures must exist before pytest collects (they parametrize the test below),
157+
# so they are built at import time. The conversion only iterates in-memory
158+
# envelopes — it never suspends on a real future — so we drive the coroutine to
159+
# completion by hand instead of asyncio.run(). asyncio.run() at import raises
160+
# RuntimeError when an event loop is already running (programmatic pytest, a
161+
# Jupyter kernel, or session-scoped asyncio loops); the loop-free driver below
162+
# is unaffected by the ambient loop state.
163+
def _run_pure_async(coro: Any) -> Any:
164+
try:
165+
coro.send(None)
166+
except StopIteration as stop:
167+
return stop.value
168+
coro.close()
169+
raise RuntimeError("conformance fixture build unexpectedly suspended on real I/O")
170+
158171

159172
_FIXTURES: list[Fixture] = [
160-
_asyncio.run(_build_fixture("claude-code-text-only", _TEXT_ENVELOPES)),
161-
_asyncio.run(_build_fixture("claude-code-tool-call-result", _TOOL_ENVELOPES)),
162-
_asyncio.run(_build_fixture("claude-code-thinking-block", _THINKING_ENVELOPES)),
163-
_asyncio.run(_build_fixture("claude-code-multi-step", _MULTI_STEP_ENVELOPES)),
173+
_run_pure_async(_build_fixture("claude-code-text-only", _TEXT_ENVELOPES)),
174+
_run_pure_async(_build_fixture("claude-code-tool-call-result", _TOOL_ENVELOPES)),
175+
_run_pure_async(_build_fixture("claude-code-thinking-block", _THINKING_ENVELOPES)),
176+
_run_pure_async(_build_fixture("claude-code-multi-step", _MULTI_STEP_ENVELOPES)),
164177
]
165178

166179
# Register into the shared registry so all_fixtures() can enumerate them

0 commit comments

Comments
 (0)