From 9a1d3639d1fee857459d6b01fae1e4fb72854868 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 10:02:41 -0400 Subject: [PATCH 1/2] test(harness): integration-test parity for openai, claude-code and codex Add offline sync/async/temporal integration suites for the openai, claude_code and codex harnesses (+76 tests), mirroring the existing langgraph/pydantic_ai coverage. Extend the harness-integration live-matrix to all five harnesses and switch the path trigger to a test_harness_*.py glob so new suites are picked up automatically. Co-Authored-By: Claude Opus 4.8 (1M context) --- .github/workflows/harness-integration.yml | 12 +- .../harness/test_harness_claude_code_async.py | 248 ++++++++++++++ .../harness/test_harness_claude_code_sync.py | 303 ++++++++++++++++ .../test_harness_claude_code_temporal.py | 183 ++++++++++ .../core/harness/test_harness_codex_async.py | 228 +++++++++++++ .../core/harness/test_harness_codex_sync.py | 278 +++++++++++++++ .../harness/test_harness_codex_temporal.py | 180 ++++++++++ .../core/harness/test_harness_openai_async.py | 305 +++++++++++++++++ .../core/harness/test_harness_openai_sync.py | 323 ++++++++++++++++++ .../harness/test_harness_openai_temporal.py | 195 +++++++++++ 10 files changed, 2249 insertions(+), 6 deletions(-) create mode 100644 tests/lib/core/harness/test_harness_claude_code_async.py create mode 100644 tests/lib/core/harness/test_harness_claude_code_sync.py create mode 100644 tests/lib/core/harness/test_harness_claude_code_temporal.py create mode 100644 tests/lib/core/harness/test_harness_codex_async.py create mode 100644 tests/lib/core/harness/test_harness_codex_sync.py create mode 100644 tests/lib/core/harness/test_harness_codex_temporal.py create mode 100644 tests/lib/core/harness/test_harness_openai_async.py create mode 100644 tests/lib/core/harness/test_harness_openai_sync.py create mode 100644 tests/lib/core/harness/test_harness_openai_temporal.py diff --git a/.github/workflows/harness-integration.yml b/.github/workflows/harness-integration.yml index 075ee5cf3..ab20929a8 100644 --- a/.github/workflows/harness-integration.yml +++ b/.github/workflows/harness-integration.yml @@ -7,8 +7,7 @@ on: paths: - "src/agentex/lib/core/harness/**" - "src/agentex/lib/adk/_modules/**" - - "tests/lib/core/harness/test_harness_pydantic_ai_*.py" - - "tests/lib/core/harness/test_harness_langgraph_*.py" + - "tests/lib/core/harness/test_harness_*.py" - ".github/workflows/harness-integration.yml" jobs: @@ -34,14 +33,15 @@ jobs: run: ./scripts/test tests/lib/core/harness/ -v # Offline harness integration tests (sync / async / temporal channels) for each - # migrated harness. These use fake streams / TestModel + fake streaming/tracing - # and require no live infrastructure. Future harness migration PRs (6-8) add - # their harness to the matrix below and their test paths to the triggers above. + # harness. These use fake streams / TestModel + fake streaming/tracing and + # require no live infrastructure. All five harnesses are now covered; the + # trigger above uses a `test_harness_*.py` glob so new suites are picked up + # automatically. live-matrix: runs-on: ubuntu-latest strategy: matrix: - harness: [pydantic_ai, langgraph] + harness: [pydantic_ai, langgraph, openai, claude_code, codex] channel: [sync, async, temporal] fail-fast: false name: ${{ matrix.harness }}-${{ matrix.channel }} diff --git a/tests/lib/core/harness/test_harness_claude_code_async.py b/tests/lib/core/harness/test_harness_claude_code_async.py new file mode 100644 index 000000000..c622d25c1 --- /dev/null +++ b/tests/lib/core/harness/test_harness_claude_code_async.py @@ -0,0 +1,248 @@ +"""Integration test: async (Redis-streaming) channel with a claude-code turn. + +Exercises the unified harness surface (UnifiedEmitter.auto_send_turn + ClaudeCodeTurn) +with hand-built claude-code ``stream-json`` envelopes and a fake streaming +backend so the test runs fully offline (no claude-code CLI subprocess, no +Redis, no Agentex server). + +Native envelope shapes are copied verbatim from the claude-code turn test and +conformance fixtures (assistant tool_use -> Start(ToolRequestContent)+Done; +user tool_result -> Full(ToolResponseContent); assistant text -> +Start(TextContent)+Delta+Done; result envelope -> usage). + +What is tested +-------------- +- auto_send pushes the correct message contexts: tool_request + tool_response + + text (in that order). +- TurnResult.final_text equals the final assistant text. +- TurnResult.usage reflects the claude-code ``result`` envelope (input/output + tokens, cost, num_llm_calls from num_turns). +- With a SpanTracer + fake tracing, a tool span is derived on the async path. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual Redis streaming. +- The ACP on_task_event_send / on_task_create / on_task_cancel lifecycle. +- A real claude-code CLI subprocess / live model behaviour. + +See also: test_harness_claude_code_sync.py and test_harness_claude_code_temporal.py. +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.lib.core.harness.types import TurnResult +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._claude_code_turn import ClaudeCodeTurn + +from ._fakes import FakeTracing + +# --------------------------------------------------------------------------- +# Native claude-code envelope fixtures +# --------------------------------------------------------------------------- + + +def _tool_then_text_envelopes() -> list[dict[str, Any]]: + return [ + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "call_read", + "name": "Read", + "input": {"path": "/workspace/README.md"}, + } + ] + }, + }, + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "call_read", + "content": "# My Project — temperature 72F", + } + ] + }, + }, + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "The project file says 72F."}]}, + }, + { + "type": "result", + "usage": {"input_tokens": 200, "output_tokens": 80}, + "cost_usd": 0.015, + "num_turns": 2, + }, + ] + + +async def _aiter(envelopes: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: + for e in envelopes: + yield e + + +# --------------------------------------------------------------------------- +# Fake streaming backend +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, sink: list[Any], ctype: str, initial_content: Any) -> None: + self.sink = sink + self.ctype = ctype + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self) -> "_FakeCtx": + self.sink.append(("open", self.ctype, self.task_message.content)) + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + self.sink.append(("close", self.ctype)) + + async def stream_update(self, update: Any) -> Any: + self.sink.append(("delta", self.ctype, update)) + return update + + +class _FakeStreaming: + def __init__(self) -> None: + self.sink: list[Any] = [] + self.messages_opened: list[Any] = [] + + def streaming_task_message_context( + self, + task_id: str, + initial_content: Any, + streaming_mode: str = "coalesced", + created_at: Any = None, + ) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + self.messages_opened.append(initial_content) + return _FakeCtx(self.sink, ctype, initial_content) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _run_auto_send_turn( + envelopes: list[dict[str, Any]], + trace_id: str | None = None, + parent_span_id: str | None = None, + fake_tracing: FakeTracing | None = None, +) -> tuple[TurnResult, _FakeStreaming]: + fake_streaming = _FakeStreaming() + tracer: SpanTracer | bool | None = None + if trace_id and fake_tracing is not None: + tracer = SpanTracer( + trace_id=trace_id, + parent_span_id=parent_span_id, + task_id="task1", + tracing=fake_tracing, + ) + + turn = ClaudeCodeTurn(_aiter(envelopes)) + emitter = UnifiedEmitter( + task_id="task1", + trace_id=trace_id, + parent_span_id=parent_span_id, + tracer=tracer if tracer is not None else False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + return result, fake_streaming + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestAsyncAutoSendMessageOrder: + async def test_tool_request_pushed_before_tool_response(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert "tool_request" in types + assert "tool_response" in types + assert types.index("tool_request") < types.index("tool_response") + + async def test_text_pushed_last(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert types[-1] == "text", f"Expected last type=text, got {types}" + + +class TestAsyncAutoSendContent: + async def test_tool_request_content(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) + tool_reqs = [m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)] + assert len(tool_reqs) == 1 + assert tool_reqs[0].name == "Read" + + async def test_tool_response_content(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) + tool_resps = [m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)] + assert len(tool_resps) == 1 + assert "72F" in str(tool_resps[0].content) + + async def test_tool_call_ids_match(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) + tool_req = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)) + tool_resp = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)) + assert tool_req.tool_call_id == tool_resp.tool_call_id == "call_read" + + +class TestAsyncAutoSendFinalTextAndUsage: + async def test_final_text_matches_last_text(self) -> None: + result, _ = await _run_auto_send_turn(_tool_then_text_envelopes()) + assert result.final_text == "The project file says 72F." + + async def test_usage_from_result_envelope(self) -> None: + """TurnResult.usage reflects the claude-code result envelope.""" + result, _ = await _run_auto_send_turn(_tool_then_text_envelopes()) + assert result.usage is not None + assert result.usage.input_tokens == 200 + assert result.usage.output_tokens == 80 + assert result.usage.total_tokens == 280 + assert result.usage.cost_usd == pytest.approx(0.015) + assert result.usage.num_llm_calls == 2 + + async def test_context_lifecycle_open_then_close(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) + opens = [e for e in fake_streaming.sink if e[0] == "open"] + closes = [e for e in fake_streaming.sink if e[0] == "close"] + assert len(opens) == len(closes) + assert len(opens) == len(fake_streaming.messages_opened) + + +class TestAsyncAutoSendSpanDerivation: + async def test_tool_span_derived_on_async_path(self) -> None: + fake_tracing = FakeTracing() + await _run_auto_send_turn( + _tool_then_text_envelopes(), + trace_id="trace1", + parent_span_id="parent", + fake_tracing=fake_tracing, + ) + assert len(fake_tracing.started) == 1 + assert fake_tracing.started[0][0] == "Read" + assert len(fake_tracing.ended) == 1 + assert "72F" in str(fake_tracing.ended[0][1]) diff --git a/tests/lib/core/harness/test_harness_claude_code_sync.py b/tests/lib/core/harness/test_harness_claude_code_sync.py new file mode 100644 index 000000000..b53485499 --- /dev/null +++ b/tests/lib/core/harness/test_harness_claude_code_sync.py @@ -0,0 +1,303 @@ +"""Integration test: sync (HTTP-yield) channel with a claude-code turn. + +Exercises the unified harness surface (UnifiedEmitter.yield_turn + ClaudeCodeTurn) +with hand-built claude-code ``stream-json`` envelopes so the test runs fully +offline (no claude-code CLI subprocess, no API keys, no Agentex server). + +Native stream shapes +--------------------- +``ClaudeCodeTurn`` consumes an async iterator of raw claude-code stream-json +envelopes (str | dict). The envelope shapes used here are copied verbatim from +the claude-code turn test (tests/lib/adk/test_claude_code_turn.py) and the +claude-code conformance fixtures +(tests/lib/core/harness/conformance/test_claude_code_conformance.py): + + assistant text block -> Start(TextContent) + Delta + Done + assistant tool_use -> Start(ToolRequestContent) + Done + user tool_result -> Full(ToolResponseContent) + assistant thinking -> Start(ReasoningContent) + Delta + Done + +What is tested +-------------- +- The sync handler forwards StreamTaskMessage* events in canonical order: + tool_request (Start+Done) -> tool_response (Full) -> text. +- The tool_response carries the tool_result content, keyed by tool_use_id. +- With a trace_id + fake tracing, the SpanDeriver opens a tool span on + Done(tool_request) and closes it on the matching Full(tool_response), and + opens/closes a reasoning span for a thinking block. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual HTTP streaming over the ACP sync endpoint. +- A real claude-code CLI subprocess / live model behaviour. +- The full FastACP request/response lifecycle. + +See also: test_harness_claude_code_async.py and test_harness_claude_code_temporal.py. +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator, override + +from agentex.lib.core.harness.types import OpenSpan, CloseSpan +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._claude_code_turn import ClaudeCodeTurn + +from ._fakes import FakeTracing + +# --------------------------------------------------------------------------- +# Native claude-code envelope fixtures (copied from the turn + conformance tests) +# --------------------------------------------------------------------------- + + +def _tool_then_text_envelopes() -> list[dict[str, Any]]: + """tool_use -> tool_result -> final text, then a result envelope with usage.""" + return [ + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "call_read", + "name": "Read", + "input": {"path": "/workspace/README.md"}, + } + ] + }, + }, + { + "type": "user", + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "call_read", + "content": "# My Project — temperature 72F", + } + ] + }, + }, + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "The project file says 72F."}]}, + }, + { + "type": "result", + "usage": {"input_tokens": 100, "output_tokens": 50}, + "cost_usd": 0.01, + "num_turns": 2, + }, + ] + + +def _thinking_envelopes() -> list[dict[str, Any]]: + return [ + { + "type": "assistant", + "message": { + "content": [ + {"type": "thinking", "thinking": "Let me think.\nStep 1: check the facts."}, + {"type": "text", "text": "Here is my answer."}, + ] + }, + }, + {"type": "result", "usage": {"input_tokens": 10, "output_tokens": 5}}, + ] + + +async def _aiter(envelopes: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: + for e in envelopes: + yield e + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _run_yield_turn( + envelopes: list[dict[str, Any]], + trace_id: str | None = None, + parent_span_id: str | None = None, + fake_tracing: FakeTracing | None = None, +) -> list[Any]: + tracer: SpanTracer | bool | None = None + if trace_id and fake_tracing is not None: + tracer = SpanTracer( + trace_id=trace_id, + parent_span_id=parent_span_id, + task_id="task1", + tracing=fake_tracing, + ) + + turn = ClaudeCodeTurn(_aiter(envelopes)) + emitter = UnifiedEmitter( + task_id="task1", + trace_id=trace_id, + parent_span_id=parent_span_id, + tracer=tracer if tracer is not None else False, + ) + return [ev async for ev in emitter.yield_turn(turn)] + + +# --------------------------------------------------------------------------- +# Tests: event order and content +# --------------------------------------------------------------------------- + + +class TestSyncYieldEventOrder: + async def test_tool_request_precedes_tool_response(self) -> None: + events = await _run_yield_turn(_tool_then_text_envelopes()) + content_types = [ + getattr(getattr(ev, "content", None), "type", None) + for ev in events + if isinstance(ev, (StreamTaskMessageStart, StreamTaskMessageFull)) + ] + assert "tool_request" in content_types + assert "tool_response" in content_types + assert content_types.index("tool_request") < content_types.index("tool_response") + + async def test_text_appears_after_tool_response(self) -> None: + events = await _run_yield_turn(_tool_then_text_envelopes()) + tool_resp_pos = next( + i + for i, ev in enumerate(events) + if isinstance(ev, StreamTaskMessageFull) + and getattr(getattr(ev, "content", None), "type", None) == "tool_response" + ) + text_start_pos = next( + i + for i, ev in enumerate(events) + if isinstance(ev, StreamTaskMessageStart) and getattr(getattr(ev, "content", None), "type", None) == "text" + ) + assert tool_resp_pos < text_start_pos + + async def test_tool_response_carries_result_keyed_by_tool_use_id(self) -> None: + events = await _run_yield_turn(_tool_then_text_envelopes()) + full_responses = [ + ev.content + for ev in events + if isinstance(ev, StreamTaskMessageFull) and isinstance(getattr(ev, "content", None), ToolResponseContent) + ] + assert len(full_responses) == 1 + tool_response = full_responses[0] + assert isinstance(tool_response, ToolResponseContent) + assert tool_response.tool_call_id == "call_read" + assert "72F" in str(tool_response.content) + + async def test_tool_request_is_read(self) -> None: + events = await _run_yield_turn(_tool_then_text_envelopes()) + tool_reqs = [ + ev.content + for ev in events + if isinstance(getattr(ev, "content", None), ToolRequestContent) + ] + assert any(isinstance(c, ToolRequestContent) and c.name == "Read" for c in tool_reqs) + + async def test_every_start_has_matching_done(self) -> None: + events = await _run_yield_turn(_tool_then_text_envelopes()) + starts = {ev.index for ev in events if isinstance(ev, StreamTaskMessageStart)} + dones = {ev.index for ev in events if isinstance(ev, StreamTaskMessageDone)} + assert starts == dones, f"Unmatched Start/Done indices: starts={starts} dones={dones}" + + +# --------------------------------------------------------------------------- +# Tests: span derivation on the yield path +# --------------------------------------------------------------------------- + + +class TestSyncYieldSpanDerivation: + async def test_tool_span_opened_and_closed(self) -> None: + """Done(tool_request) opens a tool span; Full(tool_response) closes it.""" + fake_tracing = FakeTracing() + await _run_yield_turn( + _tool_then_text_envelopes(), + trace_id="trace1", + parent_span_id="parent-span", + fake_tracing=fake_tracing, + ) + assert len(fake_tracing.started) == 1 + assert len(fake_tracing.ended) == 1 + name, parent_id, _ = fake_tracing.started[0] + assert name == "Read" + assert parent_id == "parent-span" + + async def test_tool_span_output_is_tool_result(self) -> None: + fake_tracing = FakeTracing() + await _run_yield_turn( + _tool_then_text_envelopes(), + trace_id="trace1", + parent_span_id="parent-span", + fake_tracing=fake_tracing, + ) + name, output = fake_tracing.ended[0] + assert name == "Read" + assert "72F" in str(output) + + async def test_reasoning_span_for_thinking_block(self) -> None: + """A thinking block opens and closes a reasoning span.""" + fake_tracing = FakeTracing() + await _run_yield_turn( + _thinking_envelopes(), + trace_id="trace1", + parent_span_id="parent-span", + fake_tracing=fake_tracing, + ) + assert fake_tracing.started_names == ["reasoning"] + assert len(fake_tracing.ended) == 1 + + async def test_no_trace_id_means_no_spans(self) -> None: + fake_tracing = FakeTracing() + turn = ClaudeCodeTurn(_aiter(_tool_then_text_envelopes())) + emitter = UnifiedEmitter(task_id="task1", trace_id=None, parent_span_id=None, tracing=fake_tracing) + [_ async for _ in emitter.yield_turn(turn)] + assert fake_tracing.started == [] + assert fake_tracing.ended == [] + + async def test_tracer_false_suppresses_spans(self) -> None: + fake_tracing = FakeTracing() + turn = ClaudeCodeTurn(_aiter(_tool_then_text_envelopes())) + emitter = UnifiedEmitter( + task_id="task1", + trace_id="trace1", + parent_span_id="parent-span", + tracer=False, + tracing=fake_tracing, + ) + [_ async for _ in emitter.yield_turn(turn)] + assert fake_tracing.started == [] + assert fake_tracing.ended == [] + + async def test_span_signal_types(self) -> None: + received_signals: list[Any] = [] + + class _RecordingTracer(SpanTracer): + @override + async def handle(self, signal: Any) -> None: + received_signals.append(signal) + await super().handle(signal) + + fake_tracing = FakeTracing() + tracer = _RecordingTracer( + trace_id="trace1", + parent_span_id="parent", + task_id="task1", + tracing=fake_tracing, + ) + turn = ClaudeCodeTurn(_aiter(_tool_then_text_envelopes())) + emitter = UnifiedEmitter(task_id="task1", trace_id="trace1", parent_span_id="parent", tracer=tracer) + [_ async for _ in emitter.yield_turn(turn)] + + tool_signals = [s for s in received_signals if getattr(s, "name", None) == "Read"] + assert len(tool_signals) >= 1 + assert isinstance(received_signals[0], OpenSpan) + assert any(isinstance(s, CloseSpan) for s in received_signals) diff --git a/tests/lib/core/harness/test_harness_claude_code_temporal.py b/tests/lib/core/harness/test_harness_claude_code_temporal.py new file mode 100644 index 000000000..b643f0d20 --- /dev/null +++ b/tests/lib/core/harness/test_harness_claude_code_temporal.py @@ -0,0 +1,183 @@ +"""Integration test: Temporal channel with a claude-code turn, offline. + +The claude-code tap is a pure library adapter (no Temporal-specific helper such +as langgraph's ``stream_langgraph_events``). In a Temporal deployment the +claude-code CLI runs inside a Temporal activity and the resulting canonical +stream is delivered via the SAME ``UnifiedEmitter.auto_send_turn`` path used by +the non-temporal async channel. The only temporal-specific concern at the +harness boundary is that the activity stamps messages with a deterministic +``created_at`` (e.g. ``workflow.now()``) for replay determinism. + +This suite therefore exercises the auto_send path inside an activity-style call +plus the temporal-only contract: ``created_at`` is threaded through to every +streaming context. The native claude-code envelope shapes are copied verbatim +from the claude-code turn test / conformance fixtures. + +What is tested +-------------- +- The canonical message sequence (tool_request -> tool_response -> text) is + delivered via auto_send_turn, exactly as inside a Temporal activity. +- ``created_at`` passed to ``auto_send_turn`` is forwarded to every + ``streaming_task_message_context`` call (deterministic timestamping). +- Final text + usage from the result envelope are returned. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Temporal scheduling / durability / replay behaviour. +- Redis streaming (requires a running Redis instance). +- A real claude-code CLI subprocess / live model behaviour. + +See also: test_harness_claude_code_sync.py and test_harness_claude_code_async.py. +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator +from datetime import datetime, timezone + +from agentex.types.task_message import TaskMessage +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._claude_code_turn import ClaudeCodeTurn + + +def _tool_then_text_envelopes() -> list[dict[str, Any]]: + return [ + { + "type": "assistant", + "message": { + "content": [ + { + "type": "tool_use", + "id": "call_read", + "name": "Read", + "input": {"path": "/workspace/README.md"}, + } + ] + }, + }, + { + "type": "user", + "message": { + "content": [ + {"type": "tool_result", "tool_use_id": "call_read", "content": "# My Project — 72F"} + ] + }, + }, + { + "type": "assistant", + "message": {"content": [{"type": "text", "text": "The project file says 72F."}]}, + }, + {"type": "result", "usage": {"input_tokens": 50, "output_tokens": 20}, "num_turns": 2}, + ] + + +async def _aiter(envelopes: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: + for e in envelopes: + yield e + + +# --------------------------------------------------------------------------- +# Fake streaming backend that records created_at +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, ctype: str, initial_content: Any) -> None: + self.ctype = ctype + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self) -> "_FakeCtx": + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + pass + + async def stream_update(self, update: Any) -> Any: + return update + + +class _FakeStreaming: + def __init__(self) -> None: + self.messages_opened: list[Any] = [] + self.created_ats: list[Any] = [] + + def streaming_task_message_context( + self, + task_id: str, + initial_content: Any, + streaming_mode: str = "coalesced", + created_at: Any = None, + ) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + self.messages_opened.append(initial_content) + self.created_ats.append(created_at) + return _FakeCtx(ctype, initial_content) + + +async def _run_activity( + envelopes: list[dict[str, Any]], created_at: datetime | None +) -> tuple[Any, _FakeStreaming]: + fake_streaming = _FakeStreaming() + turn = ClaudeCodeTurn(_aiter(envelopes)) + emitter = UnifiedEmitter( + task_id="task1", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn, created_at=created_at) + return result, fake_streaming + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestTemporalActivityDelivery: + async def test_canonical_sequence_delivered(self) -> None: + _, fake_streaming = await _run_activity(_tool_then_text_envelopes(), created_at=None) + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert "tool_request" in types + assert "tool_response" in types + assert types.index("tool_request") < types.index("tool_response") + assert types[-1] == "text" + + async def test_tool_round_trip_keyed_correctly(self) -> None: + _, fake_streaming = await _run_activity(_tool_then_text_envelopes(), created_at=None) + tool_req = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)) + tool_resp = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)) + assert tool_req.tool_call_id == tool_resp.tool_call_id == "call_read" + + async def test_final_text_and_usage(self) -> None: + result, _ = await _run_activity(_tool_then_text_envelopes(), created_at=None) + assert result.final_text == "The project file says 72F." + assert result.usage.input_tokens == 50 + assert result.usage.num_llm_calls == 2 + + +class TestTemporalCreatedAtThreading: + async def test_created_at_threaded_to_all_contexts(self) -> None: + fixed = datetime(2026, 6, 22, 12, 0, 0, tzinfo=timezone.utc) + _, fake_streaming = await _run_activity(_tool_then_text_envelopes(), created_at=fixed) + assert len(fake_streaming.created_ats) == len(fake_streaming.messages_opened) + assert all(ts == fixed for ts in fake_streaming.created_ats), ( + f"Expected every context stamped with {fixed}, got {fake_streaming.created_ats}" + ) + + async def test_default_created_at_is_none(self) -> None: + _, fake_streaming = await _run_activity(_tool_then_text_envelopes(), created_at=None) + assert all(ts is None for ts in fake_streaming.created_ats) + + async def test_created_at_deterministic_across_runs(self) -> None: + fixed = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + _, first = await _run_activity(_tool_then_text_envelopes(), created_at=fixed) + _, second = await _run_activity(_tool_then_text_envelopes(), created_at=fixed) + assert first.created_ats == second.created_ats diff --git a/tests/lib/core/harness/test_harness_codex_async.py b/tests/lib/core/harness/test_harness_codex_async.py new file mode 100644 index 000000000..c31ebfa49 --- /dev/null +++ b/tests/lib/core/harness/test_harness_codex_async.py @@ -0,0 +1,228 @@ +"""Integration test: async (Redis-streaming) channel with a codex turn. + +Exercises the unified harness surface (UnifiedEmitter.auto_send_turn + CodexTurn) +with hand-built codex ``exec --json`` event dicts and a fake streaming backend +so the test runs fully offline (no codex CLI subprocess, no Redis, no Agentex +server). + +Native event shapes are copied verbatim from the codex turn test / conformance +fixtures (command_execution -> tool round-trip; agent_message -> text; +turn.completed -> usage). + +What is tested +-------------- +- auto_send pushes the correct message contexts: tool_request + tool_response + + text (in that order). +- TurnResult.final_text equals the final agent_message text. +- TurnResult.usage reflects the codex ``turn.completed`` usage (input/output/ + total tokens) plus the locally-counted num_tool_calls. +- With a SpanTracer + fake tracing, a tool span is derived on the async path. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual Redis streaming. +- The ACP on_task_event_send / on_task_create / on_task_cancel lifecycle. +- A real codex CLI subprocess / live model behaviour. + +See also: test_harness_codex_sync.py and test_harness_codex_temporal.py. +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +from agentex.types.task_message import TaskMessage +from agentex.lib.core.harness.types import TurnResult +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._codex_turn import CodexTurn + +from ._fakes import FakeTracing + +# --------------------------------------------------------------------------- +# Native codex event fixtures +# --------------------------------------------------------------------------- + + +def _tool_then_text_events() -> list[dict[str, Any]]: + return [ + {"type": "thread.started", "thread_id": "thread-abc"}, + { + "type": "item.started", + "item": {"id": "tool1", "type": "command_execution", "command": "cat weather.txt"}, + }, + { + "type": "item.completed", + "item": { + "id": "tool1", + "type": "command_execution", + "command": "cat weather.txt", + "aggregated_output": "sunny and 72F", + "exit_code": 0, + }, + }, + {"type": "item.started", "item": {"id": "msg1", "type": "agent_message", "text": ""}}, + { + "type": "item.completed", + "item": {"id": "msg1", "type": "agent_message", "text": "The weather is sunny and 72F."}, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 20, "output_tokens": 8, "total_tokens": 28}, + }, + ] + + +async def _aiter(events: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: + for e in events: + yield e + + +# --------------------------------------------------------------------------- +# Fake streaming backend +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, sink: list[Any], ctype: str, initial_content: Any) -> None: + self.sink = sink + self.ctype = ctype + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self) -> "_FakeCtx": + self.sink.append(("open", self.ctype, self.task_message.content)) + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + self.sink.append(("close", self.ctype)) + + async def stream_update(self, update: Any) -> Any: + self.sink.append(("delta", self.ctype, update)) + return update + + +class _FakeStreaming: + def __init__(self) -> None: + self.sink: list[Any] = [] + self.messages_opened: list[Any] = [] + + def streaming_task_message_context( + self, + task_id: str, + initial_content: Any, + streaming_mode: str = "coalesced", + created_at: Any = None, + ) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + self.messages_opened.append(initial_content) + return _FakeCtx(self.sink, ctype, initial_content) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _run_auto_send_turn( + events: list[dict[str, Any]], + trace_id: str | None = None, + parent_span_id: str | None = None, + fake_tracing: FakeTracing | None = None, +) -> tuple[TurnResult, _FakeStreaming]: + fake_streaming = _FakeStreaming() + tracer: SpanTracer | bool | None = None + if trace_id and fake_tracing is not None: + tracer = SpanTracer( + trace_id=trace_id, + parent_span_id=parent_span_id, + task_id="task1", + tracing=fake_tracing, + ) + + turn = CodexTurn(_aiter(events), model="o4-mini") + emitter = UnifiedEmitter( + task_id="task1", + trace_id=trace_id, + parent_span_id=parent_span_id, + tracer=tracer if tracer is not None else False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + return result, fake_streaming + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestAsyncAutoSendMessageOrder: + async def test_tool_request_pushed_before_tool_response(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert "tool_request" in types + assert "tool_response" in types + assert types.index("tool_request") < types.index("tool_response") + + async def test_text_pushed_last(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert types[-1] == "text", f"Expected last type=text, got {types}" + + +class TestAsyncAutoSendContent: + async def test_tool_response_content(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + tool_resps = [m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)] + assert len(tool_resps) == 1 + assert "72F" in str(tool_resps[0].content) + + async def test_tool_call_ids_match(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + tool_req = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)) + tool_resp = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)) + assert tool_req.tool_call_id == tool_resp.tool_call_id + + +class TestAsyncAutoSendFinalTextAndUsage: + async def test_final_text_matches_last_text(self) -> None: + result, _ = await _run_auto_send_turn(_tool_then_text_events()) + assert result.final_text == "The weather is sunny and 72F." + + async def test_usage_from_turn_completed(self) -> None: + """TurnResult.usage reflects the codex turn.completed usage + tool count.""" + result, _ = await _run_auto_send_turn(_tool_then_text_events()) + assert result.usage is not None + assert result.usage.input_tokens == 20 + assert result.usage.output_tokens == 8 + assert result.usage.total_tokens == 28 + assert result.usage.model == "o4-mini" + assert result.usage.num_tool_calls == 1 + assert result.usage.num_llm_calls == 1 + + async def test_context_lifecycle_open_then_close(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + opens = [e for e in fake_streaming.sink if e[0] == "open"] + closes = [e for e in fake_streaming.sink if e[0] == "close"] + assert len(opens) == len(closes) + assert len(opens) == len(fake_streaming.messages_opened) + + +class TestAsyncAutoSendSpanDerivation: + async def test_tool_span_derived_on_async_path(self) -> None: + fake_tracing = FakeTracing() + await _run_auto_send_turn( + _tool_then_text_events(), + trace_id="trace1", + parent_span_id="parent", + fake_tracing=fake_tracing, + ) + assert len(fake_tracing.started) == 1 + assert len(fake_tracing.ended) == 1 + assert "72F" in str(fake_tracing.ended[0][1]) diff --git a/tests/lib/core/harness/test_harness_codex_sync.py b/tests/lib/core/harness/test_harness_codex_sync.py new file mode 100644 index 000000000..0209e1e3d --- /dev/null +++ b/tests/lib/core/harness/test_harness_codex_sync.py @@ -0,0 +1,278 @@ +"""Integration test: sync (HTTP-yield) channel with a codex turn. + +Exercises the unified harness surface (UnifiedEmitter.yield_turn + CodexTurn) +with hand-built codex ``exec --json`` event dicts so the test runs fully offline +(no codex CLI subprocess, no API keys, no Agentex server). + +Native stream shapes +--------------------- +``CodexTurn`` consumes an async iterator of raw codex events (str | dict). The +event shapes used here are copied verbatim from the codex turn test +(tests/lib/adk/test_codex_turn.py) and the codex conformance fixtures +(tests/lib/core/harness/conformance/test_codex_conformance.py): + + command_execution item -> Start(ToolRequestContent) + Done + Full(ToolResponseContent) + agent_message item -> Start(TextContent) + ... + Full/Done + reasoning item -> Start(ReasoningContent) + Full(ReasoningContent) + turn.completed -> usage + +Reasoning note +-------------- +The codex converter emits reasoning as Start(ReasoningContent) + Full(ReasoningContent) +with NO Done event. The SpanDeriver opens a reasoning span on Start but only +closes it on a Done; with no Done, the reasoning span is closed by flush() at +end of stream (is_complete=False). This is asserted explicitly below rather than +glossed over — it is a real codex-specific quirk, not a missing channel. + +What is tested +-------------- +- The sync handler forwards StreamTaskMessage* events in canonical order: + tool_request (Start+Done) -> tool_response (Full) -> text. +- The tool_response carries the command output, keyed by item id. +- With a trace_id + fake tracing, a tool span is opened on Done(tool_request) + and closed on the matching Full(tool_response), and a reasoning span is + opened (closed-by-flush) for a reasoning item. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual HTTP streaming over the ACP sync endpoint. +- A real codex CLI subprocess / live model behaviour. +- The full FastACP request/response lifecycle. + +See also: test_harness_codex_async.py and test_harness_codex_temporal.py. +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator, override + +from agentex.lib.core.harness.types import OpenSpan, CloseSpan +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import ( + StreamTaskMessageFull, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._codex_turn import CodexTurn + +from ._fakes import FakeTracing + +# --------------------------------------------------------------------------- +# Native codex event fixtures (copied from the turn + conformance tests) +# --------------------------------------------------------------------------- + + +def _tool_then_text_events() -> list[dict[str, Any]]: + """A command_execution tool round-trip followed by a final text reply.""" + return [ + {"type": "thread.started", "thread_id": "thread-abc"}, + {"type": "turn.started"}, + { + "type": "item.started", + "item": {"id": "tool1", "type": "command_execution", "command": "cat weather.txt"}, + }, + { + "type": "item.completed", + "item": { + "id": "tool1", + "type": "command_execution", + "command": "cat weather.txt", + "aggregated_output": "sunny and 72F", + "exit_code": 0, + }, + }, + {"type": "item.started", "item": {"id": "msg1", "type": "agent_message", "text": ""}}, + { + "type": "item.completed", + "item": {"id": "msg1", "type": "agent_message", "text": "The weather is sunny and 72F."}, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 20, "output_tokens": 8, "total_tokens": 28}, + }, + ] + + +def _reasoning_events() -> list[dict[str, Any]]: + return [ + {"type": "thread.started", "thread_id": "thread-reason"}, + {"type": "item.started", "item": {"id": "r1", "type": "reasoning", "text": ""}}, + { + "type": "item.completed", + "item": {"id": "r1", "type": "reasoning", "text": "Step 1: analyze\nStep 2: solve"}, + }, + {"type": "item.started", "item": {"id": "msg2", "type": "agent_message", "text": ""}}, + {"type": "item.completed", "item": {"id": "msg2", "type": "agent_message", "text": "42"}}, + {"type": "turn.completed", "usage": {"input_tokens": 30, "output_tokens": 20, "total_tokens": 50}}, + ] + + +async def _aiter(events: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: + for e in events: + yield e + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _run_yield_turn( + events: list[dict[str, Any]], + trace_id: str | None = None, + parent_span_id: str | None = None, + fake_tracing: FakeTracing | None = None, +) -> list[Any]: + tracer: SpanTracer | bool | None = None + if trace_id and fake_tracing is not None: + tracer = SpanTracer( + trace_id=trace_id, + parent_span_id=parent_span_id, + task_id="task1", + tracing=fake_tracing, + ) + + turn = CodexTurn(_aiter(events), model="o4-mini") + emitter = UnifiedEmitter( + task_id="task1", + trace_id=trace_id, + parent_span_id=parent_span_id, + tracer=tracer if tracer is not None else False, + ) + return [ev async for ev in emitter.yield_turn(turn)] + + +# --------------------------------------------------------------------------- +# Tests: event order and content +# --------------------------------------------------------------------------- + + +class TestSyncYieldEventOrder: + async def test_tool_request_precedes_tool_response(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + content_types = [ + getattr(getattr(ev, "content", None), "type", None) + for ev in events + if isinstance(ev, (StreamTaskMessageStart, StreamTaskMessageFull)) + ] + assert "tool_request" in content_types + assert "tool_response" in content_types + assert content_types.index("tool_request") < content_types.index("tool_response") + + async def test_text_appears_after_tool_response(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + tool_resp_pos = next( + i + for i, ev in enumerate(events) + if isinstance(ev, StreamTaskMessageFull) + and getattr(getattr(ev, "content", None), "type", None) == "tool_response" + ) + text_start_pos = next( + i + for i, ev in enumerate(events) + if isinstance(ev, StreamTaskMessageStart) and getattr(getattr(ev, "content", None), "type", None) == "text" + ) + assert tool_resp_pos < text_start_pos + + async def test_tool_response_carries_command_output(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + full_responses = [ + ev.content + for ev in events + if isinstance(ev, StreamTaskMessageFull) and isinstance(getattr(ev, "content", None), ToolResponseContent) + ] + assert len(full_responses) == 1 + tool_response = full_responses[0] + assert isinstance(tool_response, ToolResponseContent) + assert "72F" in str(tool_response.content) + + async def test_tool_request_present(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + tool_reqs = [ + ev.content for ev in events if isinstance(getattr(ev, "content", None), ToolRequestContent) + ] + assert len(tool_reqs) == 1 + + +# --------------------------------------------------------------------------- +# Tests: span derivation on the yield path +# --------------------------------------------------------------------------- + + +class TestSyncYieldSpanDerivation: + async def test_tool_span_opened_and_closed(self) -> None: + """Done(tool_request) opens a tool span; Full(tool_response) closes it.""" + fake_tracing = FakeTracing() + await _run_yield_turn( + _tool_then_text_events(), + trace_id="trace1", + parent_span_id="parent-span", + fake_tracing=fake_tracing, + ) + assert len(fake_tracing.started) == 1 + assert len(fake_tracing.ended) == 1 + _name, parent_id, _input = fake_tracing.started[0] + assert parent_id == "parent-span" + + async def test_tool_span_output_is_command_output(self) -> None: + fake_tracing = FakeTracing() + await _run_yield_turn( + _tool_then_text_events(), + trace_id="trace1", + parent_span_id="parent-span", + fake_tracing=fake_tracing, + ) + _name, output = fake_tracing.ended[0] + assert "72F" in str(output) + + async def test_reasoning_span_opened_then_flush_closed(self) -> None: + """A codex reasoning item emits Start+Full (no Done): the reasoning span + opens and is closed by flush() at end of stream (is_complete=False).""" + received_signals: list[Any] = [] + + class _RecordingTracer(SpanTracer): + @override + async def handle(self, signal: Any) -> None: + received_signals.append(signal) + await super().handle(signal) + + fake_tracing = FakeTracing() + tracer = _RecordingTracer( + trace_id="trace1", + parent_span_id="parent-span", + task_id="task1", + tracing=fake_tracing, + ) + turn = CodexTurn(_aiter(_reasoning_events()), model="o4-mini") + emitter = UnifiedEmitter(task_id="task1", trace_id="trace1", parent_span_id="parent-span", tracer=tracer) + [_ async for _ in emitter.yield_turn(turn)] + + opens = [s for s in received_signals if isinstance(s, OpenSpan) and s.kind == "reasoning"] + closes = [s for s in received_signals if isinstance(s, CloseSpan) and str(s.key).startswith("reasoning:")] + assert len(opens) == 1, "Reasoning Start must open exactly one reasoning span" + assert len(closes) == 1, "Reasoning span must be closed (by flush) at end of stream" + assert closes[0].is_complete is False, "No Done event, so the reasoning span is flush-closed as incomplete" + + async def test_no_trace_id_means_no_spans(self) -> None: + fake_tracing = FakeTracing() + turn = CodexTurn(_aiter(_tool_then_text_events()), model="o4-mini") + emitter = UnifiedEmitter(task_id="task1", trace_id=None, parent_span_id=None, tracing=fake_tracing) + [_ async for _ in emitter.yield_turn(turn)] + assert fake_tracing.started == [] + assert fake_tracing.ended == [] + + async def test_tracer_false_suppresses_spans(self) -> None: + fake_tracing = FakeTracing() + turn = CodexTurn(_aiter(_tool_then_text_events()), model="o4-mini") + emitter = UnifiedEmitter( + task_id="task1", + trace_id="trace1", + parent_span_id="parent-span", + tracer=False, + tracing=fake_tracing, + ) + [_ async for _ in emitter.yield_turn(turn)] + assert fake_tracing.started == [] + assert fake_tracing.ended == [] diff --git a/tests/lib/core/harness/test_harness_codex_temporal.py b/tests/lib/core/harness/test_harness_codex_temporal.py new file mode 100644 index 000000000..0af0b862b --- /dev/null +++ b/tests/lib/core/harness/test_harness_codex_temporal.py @@ -0,0 +1,180 @@ +"""Integration test: Temporal channel with a codex turn, offline. + +The codex tap is a pure library adapter (subprocess/sandbox provisioning lives +in the golden agent; there is no codex-specific temporal helper like langgraph's +``stream_langgraph_events``). In a Temporal deployment the codex CLI runs inside +a Temporal activity and the resulting canonical stream is delivered via the SAME +``UnifiedEmitter.auto_send_turn`` path used by the non-temporal async channel. +The only temporal-specific concern at the harness boundary is that the activity +stamps messages with a deterministic ``created_at`` (e.g. ``workflow.now()``) +for replay determinism. + +This suite exercises the auto_send path inside an activity-style call plus the +temporal-only contract: ``created_at`` is threaded through to every streaming +context. The native codex event shapes are copied verbatim from the codex turn +test / conformance fixtures. + +What is tested +-------------- +- The canonical message sequence (tool_request -> tool_response -> text) is + delivered via auto_send_turn, exactly as inside a Temporal activity. +- ``created_at`` passed to ``auto_send_turn`` is forwarded to every + ``streaming_task_message_context`` call (deterministic timestamping). +- Final text + usage from turn.completed are returned. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Temporal scheduling / durability / replay behaviour. +- Redis streaming (requires a running Redis instance). +- A real codex CLI subprocess / live model behaviour. + +See also: test_harness_codex_sync.py and test_harness_codex_async.py. +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator +from datetime import datetime, timezone + +from agentex.types.task_message import TaskMessage +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._codex_turn import CodexTurn + + +def _tool_then_text_events() -> list[dict[str, Any]]: + return [ + {"type": "thread.started", "thread_id": "thread-abc"}, + { + "type": "item.started", + "item": {"id": "tool1", "type": "command_execution", "command": "cat weather.txt"}, + }, + { + "type": "item.completed", + "item": { + "id": "tool1", + "type": "command_execution", + "command": "cat weather.txt", + "aggregated_output": "sunny and 72F", + "exit_code": 0, + }, + }, + {"type": "item.started", "item": {"id": "msg1", "type": "agent_message", "text": ""}}, + { + "type": "item.completed", + "item": {"id": "msg1", "type": "agent_message", "text": "The weather is sunny and 72F."}, + }, + { + "type": "turn.completed", + "usage": {"input_tokens": 20, "output_tokens": 8, "total_tokens": 28}, + }, + ] + + +async def _aiter(events: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: + for e in events: + yield e + + +# --------------------------------------------------------------------------- +# Fake streaming backend that records created_at +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, ctype: str, initial_content: Any) -> None: + self.ctype = ctype + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self) -> "_FakeCtx": + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + pass + + async def stream_update(self, update: Any) -> Any: + return update + + +class _FakeStreaming: + def __init__(self) -> None: + self.messages_opened: list[Any] = [] + self.created_ats: list[Any] = [] + + def streaming_task_message_context( + self, + task_id: str, + initial_content: Any, + streaming_mode: str = "coalesced", + created_at: Any = None, + ) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + self.messages_opened.append(initial_content) + self.created_ats.append(created_at) + return _FakeCtx(ctype, initial_content) + + +async def _run_activity(events: list[dict[str, Any]], created_at: datetime | None) -> tuple[Any, _FakeStreaming]: + fake_streaming = _FakeStreaming() + turn = CodexTurn(_aiter(events), model="o4-mini") + emitter = UnifiedEmitter( + task_id="task1", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn, created_at=created_at) + return result, fake_streaming + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestTemporalActivityDelivery: + async def test_canonical_sequence_delivered(self) -> None: + _, fake_streaming = await _run_activity(_tool_then_text_events(), created_at=None) + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert "tool_request" in types + assert "tool_response" in types + assert types.index("tool_request") < types.index("tool_response") + assert types[-1] == "text" + + async def test_tool_round_trip_keyed_correctly(self) -> None: + _, fake_streaming = await _run_activity(_tool_then_text_events(), created_at=None) + tool_req = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)) + tool_resp = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)) + assert tool_req.tool_call_id == tool_resp.tool_call_id + + async def test_final_text_and_usage(self) -> None: + result, _ = await _run_activity(_tool_then_text_events(), created_at=None) + assert result.final_text == "The weather is sunny and 72F." + assert result.usage.total_tokens == 28 + assert result.usage.num_tool_calls == 1 + + +class TestTemporalCreatedAtThreading: + async def test_created_at_threaded_to_all_contexts(self) -> None: + fixed = datetime(2026, 6, 22, 12, 0, 0, tzinfo=timezone.utc) + _, fake_streaming = await _run_activity(_tool_then_text_events(), created_at=fixed) + assert len(fake_streaming.created_ats) == len(fake_streaming.messages_opened) + assert all(ts == fixed for ts in fake_streaming.created_ats), ( + f"Expected every context stamped with {fixed}, got {fake_streaming.created_ats}" + ) + + async def test_default_created_at_is_none(self) -> None: + _, fake_streaming = await _run_activity(_tool_then_text_events(), created_at=None) + assert all(ts is None for ts in fake_streaming.created_ats) + + async def test_created_at_deterministic_across_runs(self) -> None: + fixed = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + _, first = await _run_activity(_tool_then_text_events(), created_at=fixed) + _, second = await _run_activity(_tool_then_text_events(), created_at=fixed) + assert first.created_ats == second.created_ats diff --git a/tests/lib/core/harness/test_harness_openai_async.py b/tests/lib/core/harness/test_harness_openai_async.py new file mode 100644 index 000000000..1329b94b9 --- /dev/null +++ b/tests/lib/core/harness/test_harness_openai_async.py @@ -0,0 +1,305 @@ +"""Integration test: async (Redis-streaming) channel with an OpenAI-agents turn. + +Exercises the unified harness surface (UnifiedEmitter.auto_send_turn + OpenAITurn) +with hand-built canonical StreamTaskMessage* streams and a fake streaming +backend so the test runs fully offline (no API keys, no Redis, no Agentex +server). + +The canonical event shapes are copied from the OpenAI converter contract +(see tests/lib/core/harness/conformance/test_openai_conformance.py): tool calls +are Full(ToolRequestContent) + Full(ToolResponseContent); text is +Start+Delta+Done. + +What is tested +-------------- +- auto_send pushes the correct message contexts to the fake streaming backend: + tool_request + tool_response + text (in that order). +- TurnResult.final_text equals the accumulated text deltas. +- TurnResult carries a TurnUsage; via the OpenAITurn result/converter path the + aggregated token usage (input/output/total + num_llm_calls) is surfaced in + TurnResult.usage. +- With a SpanTracer + fake tracing, a tool span is derived on the async path. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual Redis streaming. +- The ACP on_task_event_send / on_task_create / on_task_cancel lifecycle. +- A real Runner.run_streamed execution / live OpenAI model behaviour. + +See also: test_harness_openai_sync.py and test_harness_openai_temporal.py. +""" + +from __future__ import annotations + +from typing import Any + +import pytest +from agents.usage import Usage + +from agentex.types.text_delta import TextDelta +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import TurnResult, StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._openai_turn import OpenAITurn + +from ._fakes import FakeTracing + +# --------------------------------------------------------------------------- +# Canonical event fixtures (copied from the OpenAI converter contract) +# --------------------------------------------------------------------------- + + +def _tool_then_text_events() -> list[StreamTaskMessage]: + return [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_1", + name="get_weather", + arguments={"city": "Paris"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_1", + name="get_weather", + content="The weather in Paris is sunny and 72F", + ), + ), + StreamTaskMessageStart( + type="start", + index=2, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta(type="delta", index=2, delta=TextDelta(type="text", text_delta="Sunny ")), + StreamTaskMessageDelta(type="delta", index=2, delta=TextDelta(type="text", text_delta="and 72F.")), + StreamTaskMessageDone(type="done", index=2), + ] + + +async def _canonical_stream(events: list[StreamTaskMessage]): + for e in events: + yield e + + +# --------------------------------------------------------------------------- +# Fake streaming backend (replaces adk.streaming; no Redis required) +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, sink: list[Any], ctype: str, initial_content: Any) -> None: + self.sink = sink + self.ctype = ctype + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self) -> "_FakeCtx": + self.sink.append(("open", self.ctype, self.task_message.content)) + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + self.sink.append(("close", self.ctype)) + + async def stream_update(self, update: Any) -> Any: + self.sink.append(("delta", self.ctype, update)) + return update + + +class _FakeStreaming: + def __init__(self) -> None: + self.sink: list[Any] = [] + self.messages_opened: list[Any] = [] + + def streaming_task_message_context( + self, + task_id: str, + initial_content: Any, + streaming_mode: str = "coalesced", + created_at: Any = None, + ) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + self.messages_opened.append(initial_content) + return _FakeCtx(self.sink, ctype, initial_content) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _run_auto_send_turn( + events: list[StreamTaskMessage], + trace_id: str | None = None, + parent_span_id: str | None = None, + fake_tracing: FakeTracing | None = None, +) -> tuple[TurnResult, _FakeStreaming]: + fake_streaming = _FakeStreaming() + tracer: SpanTracer | bool | None = None + if trace_id and fake_tracing is not None: + tracer = SpanTracer( + trace_id=trace_id, + parent_span_id=parent_span_id, + task_id="task1", + tracing=fake_tracing, + ) + + turn = OpenAITurn(stream=_canonical_stream(events), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task1", + trace_id=trace_id, + parent_span_id=parent_span_id, + tracer=tracer if tracer is not None else False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + return result, fake_streaming + + +# --------------------------------------------------------------------------- +# Tests: message order and content +# --------------------------------------------------------------------------- + + +class TestAsyncAutoSendMessageOrder: + async def test_tool_request_pushed_before_tool_response(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + message_types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert "tool_request" in message_types + assert message_types.index("tool_request") < message_types.index("tool_response") + + async def test_text_pushed_last(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + message_types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert message_types[-1] == "text", f"Expected last message type=text, got {message_types}" + + async def test_exactly_three_messages(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + assert len(fake_streaming.messages_opened) == 3, ( + f"Expected 3 messages, got {[getattr(m, 'type', None) for m in fake_streaming.messages_opened]}" + ) + + +class TestAsyncAutoSendContentVerification: + async def test_tool_request_content(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + tool_reqs = [m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)] + assert len(tool_reqs) == 1 + assert tool_reqs[0].name == "get_weather" + + async def test_tool_response_content(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + tool_resps = [m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)] + assert len(tool_resps) == 1 + assert "72F" in str(tool_resps[0].content) + assert tool_resps[0].name == "get_weather" + + async def test_tool_call_ids_match(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + tool_req = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)) + tool_resp = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)) + assert tool_req.tool_call_id == tool_resp.tool_call_id + + +class TestAsyncAutoSendFinalTextAndUsage: + async def test_final_text_matches_deltas(self) -> None: + result, _ = await _run_auto_send_turn(_tool_then_text_events()) + assert result.final_text == "Sunny and 72F." + + async def test_turn_result_has_usage(self) -> None: + """An injected canonical stream has no run to read usage from, so usage + carries only the model name (input_tokens stays None).""" + result, _ = await _run_auto_send_turn(_tool_then_text_events()) + assert result.usage is not None + assert result.usage.model == "gpt-4o" + + async def test_context_lifecycle_open_then_close(self) -> None: + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_events()) + opens = [e for e in fake_streaming.sink if e[0] == "open"] + closes = [e for e in fake_streaming.sink if e[0] == "close"] + assert len(opens) == len(closes) == 3 + + async def test_usage_populated_from_result_path(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Via the OpenAITurn result/converter path, aggregated token usage is + surfaced on TurnResult.usage after the stream is consumed. + + Mirrors the OpenAI turn test: a fake RunResultStreaming exposes + raw_responses with a Usage, and the converter is monkeypatched to a + passthrough so the canonical text stream is delivered while usage is read + from raw_responses. + """ + import agentex.lib.adk._modules._openai_turn as turn_mod + + canonical: list[StreamTaskMessage] = [ + StreamTaskMessageStart( + type="start", index=0, content=TextContent(type="text", author="agent", content="") + ), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="hi")), + StreamTaskMessageDone(type="done", index=0), + ] + + class _FakeResult: + def __init__(self) -> None: + self.raw_responses = [ + type("R", (), {"usage": Usage(requests=2, input_tokens=8, output_tokens=4, total_tokens=12)})() + ] + + def stream_events(self): # type: ignore[no-untyped-def] + return _canonical_stream(canonical) + + async def _passthrough(stream): # type: ignore[no-untyped-def] + async for e in stream: + yield e + + monkeypatch.setattr(turn_mod, "convert_openai_to_agentex_events", _passthrough) + + turn = OpenAITurn(result=_FakeResult(), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task1", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=_FakeStreaming(), + ) + result = await emitter.auto_send_turn(turn) + + assert result.final_text == "hi" + assert result.usage.model == "gpt-4o" + assert result.usage.num_llm_calls == 2 + assert result.usage.input_tokens == 8 + assert result.usage.output_tokens == 4 + assert result.usage.total_tokens == 12 + + +class TestAsyncAutoSendSpanDerivation: + async def test_tool_span_derived_on_async_path(self) -> None: + fake_tracing = FakeTracing() + await _run_auto_send_turn( + _tool_then_text_events(), + trace_id="trace1", + parent_span_id="parent", + fake_tracing=fake_tracing, + ) + assert len(fake_tracing.started) == 1 + assert fake_tracing.started[0][0] == "get_weather" + assert len(fake_tracing.ended) == 1 diff --git a/tests/lib/core/harness/test_harness_openai_sync.py b/tests/lib/core/harness/test_harness_openai_sync.py new file mode 100644 index 000000000..34a9b72c6 --- /dev/null +++ b/tests/lib/core/harness/test_harness_openai_sync.py @@ -0,0 +1,323 @@ +"""Integration test: sync (HTTP-yield) channel with an OpenAI-agents turn. + +Exercises the unified harness surface (UnifiedEmitter.yield_turn + OpenAITurn) +with hand-built canonical StreamTaskMessage* streams so the test runs fully +offline (no API keys, no live OpenAI Agents run, no Agentex server). + +Why an injected canonical stream +-------------------------------- +OpenAI's native ``RunResultStreaming`` events are heavy SDK objects; the +``OpenAITurn`` accepts a pre-built canonical ``stream=`` of StreamTaskMessage* +events that bypasses ``convert_openai_to_agentex_events``. The shapes used here +are copied verbatim from the OpenAI converter contract exercised by +``tests/lib/core/harness/conformance/test_openai_conformance.py`` (tool calls +are Full(ToolRequestContent) + Full(ToolResponseContent); reasoning is +Start(ReasoningContent) + Delta + Done). This keeps the canonical stream +faithful to what the live converter produces while staying offline. + +What is tested +-------------- +- The sync handler forwards StreamTaskMessage* events verbatim in canonical + order: tool_request (Full) -> tool_response (Full) -> text (Start+Delta+Done). +- Final accumulated text equals the seeded text deltas. +- With a trace_id + fake tracing, a tool span is opened (OpenSpan) on + Full(ToolRequestContent) and closed (CloseSpan) on the matching + Full(ToolResponseContent), and a reasoning span is opened/closed for a + reasoning segment — proving the SpanDeriver is wired on the yield path. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual HTTP streaming over the ACP sync endpoint. +- A real ``Runner.run_streamed`` execution / live OpenAI model behaviour. +- ``convert_openai_to_agentex_events`` over real SDK events (covered by the + OpenAI turn + conformance suites). + +See also: test_harness_openai_async.py and test_harness_openai_temporal.py. +""" + +from __future__ import annotations + +from typing import Any, override + +from agentex.types.text_delta import TextDelta +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import OpenSpan, CloseSpan, StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.reasoning_content import ReasoningContent +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._openai_turn import OpenAITurn +from agentex.types.reasoning_content_delta import ReasoningContentDelta + +from ._fakes import FakeTracing + +# --------------------------------------------------------------------------- +# Canonical event fixtures (copied from the OpenAI converter contract) +# --------------------------------------------------------------------------- + + +def _tool_then_text_events() -> list[StreamTaskMessage]: + """A tool round-trip followed by a final text reply. + + Mirrors the OpenAI converter's tool path: a Full(ToolRequestContent) for the + call and a Full(ToolResponseContent) for the result (matched by tool_call_id), + then a streamed text answer. + """ + return [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_1", + name="get_weather", + arguments={"city": "Paris"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_1", + name="get_weather", + content="The weather in Paris is sunny and 72F", + ), + ), + StreamTaskMessageStart( + type="start", + index=2, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta(type="delta", index=2, delta=TextDelta(type="text", text_delta="Sunny ")), + StreamTaskMessageDelta(type="delta", index=2, delta=TextDelta(type="text", text_delta="and 72F.")), + StreamTaskMessageDone(type="done", index=2), + ] + + +def _reasoning_events() -> list[StreamTaskMessage]: + """A reasoning segment: Start(ReasoningContent) + Delta + Done.""" + return [ + StreamTaskMessageStart( + type="start", + index=0, + content=ReasoningContent(type="reasoning", author="agent", summary=["Thinking..."]), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta(type="reasoning_content", content_index=0, content_delta="step 1"), + ), + StreamTaskMessageDone(type="done", index=0), + ] + + +async def _canonical_stream(events: list[StreamTaskMessage]): + for e in events: + yield e + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _run_yield_turn( + events: list[StreamTaskMessage], + trace_id: str | None = None, + parent_span_id: str | None = None, + fake_tracing: FakeTracing | None = None, +) -> list[Any]: + """Drive the sync (yield) path and collect all yielded events.""" + tracer: SpanTracer | bool | None = None + if trace_id and fake_tracing is not None: + tracer = SpanTracer( + trace_id=trace_id, + parent_span_id=parent_span_id, + task_id="task1", + tracing=fake_tracing, + ) + + turn = OpenAITurn(stream=_canonical_stream(events), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task1", + trace_id=trace_id, + parent_span_id=parent_span_id, + tracer=tracer if tracer is not None else False, + ) + return [ev async for ev in emitter.yield_turn(turn)] + + +# --------------------------------------------------------------------------- +# Tests: event order and content +# --------------------------------------------------------------------------- + + +class TestSyncYieldEventOrder: + async def test_tool_request_precedes_tool_response(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + content_types = [ + getattr(getattr(ev, "content", None), "type", None) + for ev in events + if isinstance(ev, (StreamTaskMessageStart, StreamTaskMessageFull)) + ] + assert "tool_request" in content_types + assert "tool_response" in content_types + assert content_types.index("tool_request") < content_types.index("tool_response") + + async def test_text_appears_after_tool_response(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + tool_resp_pos = next( + i + for i, ev in enumerate(events) + if isinstance(ev, StreamTaskMessageFull) + and getattr(getattr(ev, "content", None), "type", None) == "tool_response" + ) + text_start_pos = next( + i + for i, ev in enumerate(events) + if isinstance(ev, StreamTaskMessageStart) and getattr(getattr(ev, "content", None), "type", None) == "text" + ) + assert tool_resp_pos < text_start_pos + + async def test_tool_response_carries_weather_result(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + full_responses = [ + ev + for ev in events + if isinstance(ev, StreamTaskMessageFull) and isinstance(getattr(ev, "content", None), ToolResponseContent) + ] + assert len(full_responses) == 1 + tool_response = full_responses[0].content + assert isinstance(tool_response, ToolResponseContent) + assert "72F" in str(tool_response.content) + assert tool_response.name == "get_weather" + + async def test_accumulated_text_matches_deltas(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + accumulated = "".join( + ev.delta.text_delta + for ev in events + if isinstance(ev, StreamTaskMessageDelta) and isinstance(ev.delta, TextDelta) and ev.delta.text_delta + ) + assert accumulated == "Sunny and 72F." + + async def test_every_start_has_matching_done(self) -> None: + events = await _run_yield_turn(_tool_then_text_events()) + starts = {ev.index for ev in events if isinstance(ev, StreamTaskMessageStart)} + dones = {ev.index for ev in events if isinstance(ev, StreamTaskMessageDone)} + assert starts == dones, f"Unmatched Start/Done indices: starts={starts} dones={dones}" + + +# --------------------------------------------------------------------------- +# Tests: span derivation on the yield path +# --------------------------------------------------------------------------- + + +class TestSyncYieldSpanDerivation: + async def test_tool_span_opened_and_closed(self) -> None: + """Full(ToolRequestContent) opens a tool span; Full(ToolResponseContent) closes it.""" + fake_tracing = FakeTracing() + await _run_yield_turn( + _tool_then_text_events(), + trace_id="trace1", + parent_span_id="parent-span", + fake_tracing=fake_tracing, + ) + + assert len(fake_tracing.started) == 1, "Expected exactly one tool span opened" + assert len(fake_tracing.ended) == 1, "Expected exactly one tool span closed" + name, parent_id, _ = fake_tracing.started[0] + assert name == "get_weather" + assert parent_id == "parent-span" + + async def test_tool_span_output_is_tool_result(self) -> None: + fake_tracing = FakeTracing() + await _run_yield_turn( + _tool_then_text_events(), + trace_id="trace1", + parent_span_id="parent-span", + fake_tracing=fake_tracing, + ) + name, output = fake_tracing.ended[0] + assert name == "get_weather" + assert "72F" in str(output) + + async def test_reasoning_span_opened_and_closed(self) -> None: + """A reasoning segment opens and closes a reasoning span.""" + fake_tracing = FakeTracing() + await _run_yield_turn( + _reasoning_events(), + trace_id="trace1", + parent_span_id="parent-span", + fake_tracing=fake_tracing, + ) + assert fake_tracing.started_names == ["reasoning"] + assert len(fake_tracing.ended) == 1 + + async def test_no_trace_id_means_no_spans(self) -> None: + fake_tracing = FakeTracing() + turn = OpenAITurn(stream=_canonical_stream(_tool_then_text_events()), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task1", + trace_id=None, + parent_span_id=None, + tracing=fake_tracing, + ) + [_ async for _ in emitter.yield_turn(turn)] + assert fake_tracing.started == [] + assert fake_tracing.ended == [] + + async def test_tracer_false_suppresses_spans(self) -> None: + fake_tracing = FakeTracing() + turn = OpenAITurn(stream=_canonical_stream(_tool_then_text_events()), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task1", + trace_id="trace1", + parent_span_id="parent-span", + tracer=False, + tracing=fake_tracing, + ) + [_ async for _ in emitter.yield_turn(turn)] + assert fake_tracing.started == [] + assert fake_tracing.ended == [] + + async def test_span_signal_types(self) -> None: + """The signals received by the tracer are OpenSpan then CloseSpan.""" + received_signals: list[Any] = [] + + class _RecordingTracer(SpanTracer): + @override + async def handle(self, signal: Any) -> None: + received_signals.append(signal) + await super().handle(signal) + + fake_tracing = FakeTracing() + tracer = _RecordingTracer( + trace_id="trace1", + parent_span_id="parent", + task_id="task1", + tracing=fake_tracing, + ) + turn = OpenAITurn(stream=_canonical_stream(_tool_then_text_events()), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task1", + trace_id="trace1", + parent_span_id="parent", + tracer=tracer, + ) + [_ async for _ in emitter.yield_turn(turn)] + + assert len(received_signals) == 2 + assert isinstance(received_signals[0], OpenSpan) + assert isinstance(received_signals[1], CloseSpan) + assert received_signals[0].name == "get_weather" diff --git a/tests/lib/core/harness/test_harness_openai_temporal.py b/tests/lib/core/harness/test_harness_openai_temporal.py new file mode 100644 index 000000000..61cda37ef --- /dev/null +++ b/tests/lib/core/harness/test_harness_openai_temporal.py @@ -0,0 +1,195 @@ +"""Integration test: Temporal channel with an OpenAI-agents turn, offline. + +In a Temporal OpenAI deployment (see +examples/tutorials/10_async/10_temporal/120_openai_agents), the OpenAI Agents +SDK run executes inside a Temporal activity. Each turn's canonical stream is +delivered to Redis via the SAME ``UnifiedEmitter.auto_send_turn`` path used by +the non-temporal async channel — the only temporal-specific concern at the +harness boundary is that the activity stamps messages with a deterministic +``created_at`` (e.g. ``workflow.now()``) so replay is deterministic. + +There is no dedicated ``stream_openai_events`` temporal helper (unlike +langgraph's ``stream_langgraph_events``); the temporal OpenAI agent builds an +``OpenAITurn`` and calls ``auto_send_turn`` directly inside the activity. This +suite therefore exercises the auto_send path plus the temporal-only contract: +``created_at`` is threaded through to every streaming context. + +What is tested +-------------- +- The canonical message sequence (tool_request -> tool_response -> text) is + delivered via auto_send_turn, exactly as inside a Temporal activity. +- ``created_at`` passed to ``auto_send_turn`` is forwarded to every + ``streaming_task_message_context`` call (deterministic timestamping). +- Final text is returned from the turn. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Temporal scheduling (workflow.signal -> activity dispatch). +- Temporal durability / replay behaviour. +- Redis streaming (requires a running Redis instance). +- A real Runner.run_streamed execution / live OpenAI model behaviour. + +See also: test_harness_openai_sync.py and test_harness_openai_async.py. +""" + +from __future__ import annotations + +from typing import Any +from datetime import datetime, timezone + +from agentex.types.text_delta import TextDelta +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import StreamTaskMessage +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._openai_turn import OpenAITurn + + +def _tool_then_text_events() -> list[StreamTaskMessage]: + return [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_1", + name="get_weather", + arguments={"city": "Paris"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_1", + name="get_weather", + content="The weather in Paris is sunny and 72F", + ), + ), + StreamTaskMessageStart( + type="start", + index=2, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta(type="delta", index=2, delta=TextDelta(type="text", text_delta="Sunny ")), + StreamTaskMessageDelta(type="delta", index=2, delta=TextDelta(type="text", text_delta="and 72F.")), + StreamTaskMessageDone(type="done", index=2), + ] + + +async def _canonical_stream(events: list[StreamTaskMessage]): + for e in events: + yield e + + +# --------------------------------------------------------------------------- +# Fake streaming backend that records the created_at it receives +# --------------------------------------------------------------------------- + + +class _FakeCtx: + def __init__(self, ctype: str, initial_content: Any) -> None: + self.ctype = ctype + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self) -> "_FakeCtx": + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + pass + + async def stream_update(self, update: Any) -> Any: + return update + + +class _FakeStreaming: + def __init__(self) -> None: + self.messages_opened: list[Any] = [] + self.created_ats: list[Any] = [] + + def streaming_task_message_context( + self, + task_id: str, + initial_content: Any, + streaming_mode: str = "coalesced", + created_at: Any = None, + ) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + self.messages_opened.append(initial_content) + self.created_ats.append(created_at) + return _FakeCtx(ctype, initial_content) + + +async def _run_activity(events: list[StreamTaskMessage], created_at: datetime | None) -> tuple[Any, _FakeStreaming]: + """Mirror the temporal activity body: build an OpenAITurn and auto_send it.""" + fake_streaming = _FakeStreaming() + turn = OpenAITurn(stream=_canonical_stream(events), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task1", + trace_id=None, + parent_span_id=None, + tracer=False, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn, created_at=created_at) + return result, fake_streaming + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestTemporalActivityMessageOrder: + async def test_canonical_sequence_delivered(self) -> None: + _, fake_streaming = await _run_activity(_tool_then_text_events(), created_at=None) + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] + assert "tool_request" in types + assert "tool_response" in types + assert types.index("tool_request") < types.index("tool_response") + assert types[-1] == "text" + + async def test_final_text_returned(self) -> None: + result, _ = await _run_activity(_tool_then_text_events(), created_at=None) + assert result.final_text == "Sunny and 72F." + + +class TestTemporalCreatedAtThreading: + """created_at is forwarded to every streaming context (deterministic replay).""" + + async def test_created_at_threaded_to_all_contexts(self) -> None: + fixed = datetime(2026, 6, 22, 12, 0, 0, tzinfo=timezone.utc) + _, fake_streaming = await _run_activity(_tool_then_text_events(), created_at=fixed) + assert len(fake_streaming.created_ats) == 3 + assert all(ts == fixed for ts in fake_streaming.created_ats), ( + f"Expected every context stamped with {fixed}, got {fake_streaming.created_ats}" + ) + + async def test_default_created_at_is_none(self) -> None: + """When the activity does not stamp a timestamp, contexts see None.""" + _, fake_streaming = await _run_activity(_tool_then_text_events(), created_at=None) + assert all(ts is None for ts in fake_streaming.created_ats) + + async def test_created_at_is_deterministic_across_runs(self) -> None: + """Two runs with the same created_at stamp identical timestamps — the + determinism the Temporal channel relies on for replay.""" + fixed = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + _, first = await _run_activity(_tool_then_text_events(), created_at=fixed) + _, second = await _run_activity(_tool_then_text_events(), created_at=fixed) + assert first.created_ats == second.created_ats + assert all(ts == fixed for ts in first.created_ats) From 8a0cf6bf8c2eabc2ed6ef1e04d24125724f5dea1 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 23:53:18 +0000 Subject: [PATCH 2/2] fix(test): update codex reasoning span expectation --- .../core/harness/test_harness_codex_sync.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/tests/lib/core/harness/test_harness_codex_sync.py b/tests/lib/core/harness/test_harness_codex_sync.py index 0209e1e3d..6129716ee 100644 --- a/tests/lib/core/harness/test_harness_codex_sync.py +++ b/tests/lib/core/harness/test_harness_codex_sync.py @@ -18,11 +18,9 @@ Reasoning note -------------- -The codex converter emits reasoning as Start(ReasoningContent) + Full(ReasoningContent) -with NO Done event. The SpanDeriver opens a reasoning span on Start but only -closes it on a Done; with no Done, the reasoning span is closed by flush() at -end of stream (is_complete=False). This is asserted explicitly below rather than -glossed over — it is a real codex-specific quirk, not a missing channel. +The codex converter emits reasoning as Start(ReasoningContent) + deltas + Done. +The SpanDeriver opens a reasoning span on Start and closes it normally when the +Done event is observed (is_complete=True). What is tested -------------- @@ -31,7 +29,7 @@ - The tool_response carries the command output, keyed by item id. - With a trace_id + fake tracing, a tool span is opened on Done(tool_request) and closed on the matching Full(tool_response), and a reasoning span is - opened (closed-by-flush) for a reasoning item. + opened and closed normally for a reasoning item. What is NOT covered without live infrastructure ----------------------------------------------- @@ -227,9 +225,9 @@ async def test_tool_span_output_is_command_output(self) -> None: _name, output = fake_tracing.ended[0] assert "72F" in str(output) - async def test_reasoning_span_opened_then_flush_closed(self) -> None: - """A codex reasoning item emits Start+Full (no Done): the reasoning span - opens and is closed by flush() at end of stream (is_complete=False).""" + async def test_reasoning_span_opened_then_done_closed(self) -> None: + """A codex reasoning item emits Start+Delta+Done: the reasoning span + opens and is closed normally when the Done event is observed.""" received_signals: list[Any] = [] class _RecordingTracer(SpanTracer): @@ -252,8 +250,8 @@ async def handle(self, signal: Any) -> None: opens = [s for s in received_signals if isinstance(s, OpenSpan) and s.kind == "reasoning"] closes = [s for s in received_signals if isinstance(s, CloseSpan) and str(s.key).startswith("reasoning:")] assert len(opens) == 1, "Reasoning Start must open exactly one reasoning span" - assert len(closes) == 1, "Reasoning span must be closed (by flush) at end of stream" - assert closes[0].is_complete is False, "No Done event, so the reasoning span is flush-closed as incomplete" + assert len(closes) == 1, "Reasoning span must close exactly once" + assert closes[0].is_complete is True, "Done event closes the reasoning span as complete" async def test_no_trace_id_means_no_spans(self) -> None: fake_tracing = FakeTracing()