diff --git a/.github/workflows/agentex-tutorials-test.yml b/.github/workflows/agentex-tutorials-test.yml index f19c58d4d..41b495d71 100644 --- a/.github/workflows/agentex-tutorials-test.yml +++ b/.github/workflows/agentex-tutorials-test.yml @@ -49,6 +49,29 @@ jobs: curl -LsSf https://astral.sh/uv/install.sh | sh echo "$HOME/.local/bin" >> $GITHUB_PATH + # Subprocess-CLI harnesses: install the relevant CLI only for the + # claude-code / codex tutorials (no-op for every other tutorial). npm is + # preinstalled on ubuntu runners. Versions mirror the golden agent's + # sandbox image (teams/sgp/agents/golden_agent/sandbox/Dockerfile): claude-code + # is pinned to the same CLAUDE_CODE_VERSION; codex is left unpinned there, + # so it is left unpinned here too. Bump CLAUDE_CODE_VERSION in lockstep + # with the sandbox Dockerfile. + - name: Install harness CLI (claude-code / codex only) + if: ${{ contains(matrix.tutorial, 'claude_code') || contains(matrix.tutorial, 'codex') }} + env: + CLAUDE_CODE_VERSION: "2.1.142" + run: | + if [[ "${{ matrix.tutorial }}" == *claude_code* ]]; then + echo "πŸ“¦ Installing Claude Code CLI (v${CLAUDE_CODE_VERSION})..." + npm install -g "@anthropic-ai/claude-code@${CLAUDE_CODE_VERSION}" + claude --version || true + fi + if [[ "${{ matrix.tutorial }}" == *codex* ]]; then + echo "πŸ“¦ Installing Codex CLI..." + npm install -g @openai/codex + codex --version || true + fi + - name: Pull latest AgentEx image run: | echo "🐳 Pulling latest Scale AgentEx Docker image..." @@ -136,6 +159,11 @@ jobs: working-directory: ./examples/tutorials env: OPENAI_API_KEY: ${{ secrets.TUTORIAL_OPENAI_API_KEY }} + ANTHROPIC_API_KEY: ${{ secrets.TUTORIAL_ANTHROPIC_API_KEY }} + # Enable the gated live tests only for the matching subprocess-CLI + # harness tutorial (the CLI is installed for it in the step above). + CLAUDE_LIVE_TESTS: ${{ contains(matrix.tutorial, 'claude_code') && '1' || '' }} + CODEX_LIVE_TESTS: ${{ contains(matrix.tutorial, 'codex') && '1' || '' }} HEALTH_CHECK_PORT: 8080 # Use non-privileged port for temporal worker health checks run: | echo "Testing tutorial: ${{ matrix.tutorial }}" diff --git a/.github/workflows/harness-integration.yml b/.github/workflows/harness-integration.yml new file mode 100644 index 000000000..51893f10f --- /dev/null +++ b/.github/workflows/harness-integration.yml @@ -0,0 +1,40 @@ +name: Harness Integration + +on: + push: + branches: [main] + pull_request: + paths: + - "src/agentex/lib/core/harness/**" + - "src/agentex/lib/adk/_modules/**" + - ".github/workflows/harness-integration.yml" + +jobs: + conformance: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Install uv + uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5.4.2 + with: + version: '0.10.2' + + - name: Bootstrap + run: ./scripts/bootstrap + + # Defer to scripts/test so the harness suite runs under the exact same + # invocation as the main CI test job: DEFER_PYDANTIC_BUILD=false and + # `uv run --isolated --all-packages --all-extras pytest`, across the + # min/max supported Python versions. Running `uv run pytest` directly + # would risk an all-extras-only dep passing locally but failing in CI. + - name: Conformance suite + run: ./scripts/test tests/lib/core/harness/ -v + + # Live integration matrix (harness x {sync, async, temporal}) is added per-harness + # in the migration plans. Placeholder job keeps the workflow valid until then. + live-matrix: + runs-on: ubuntu-latest + if: false # enabled once the first harness's test agents land + steps: + - run: echo "populated by migration PRs" # TODO(harness-migration): enable per-harness; see migration PRs 4-8 diff --git a/docs/superpowers/plans/2026-06-22-golden-agent-unified-surface-adoption.md b/docs/superpowers/plans/2026-06-22-golden-agent-unified-surface-adoption.md new file mode 100644 index 000000000..9e3eca655 --- /dev/null +++ b/docs/superpowers/plans/2026-06-22-golden-agent-unified-surface-adoption.md @@ -0,0 +1,93 @@ +# Golden Agent β€” Adopting the Unified Harness Surface + +Date: 2026-06-22 +Status: Plan (implementation lands in `agentex-agents`, after the SDK stack merges + releases) +SDK repo: `scale-agentex-python` Β· Agent repo: `agentex-agents` (`teams/sgp/agents/golden_agent`) + +## Goal + +Replace the golden agent's bespoke harness internals (its neutral `HarnessEvent` vocabulary, the `AgentexStreamAdapter`, and the per-provider stream parsers) with the now-first-class SDK unified surface β€” `convert__to_agentex_events` taps + `Turn` + `UnifiedEmitter` β€” while keeping every SGP-coupled orchestration concern (sandbox pool, sandbox setup, secret/MCP reauth, data-plane override) exactly where it is. Net effect: the golden agent stops maintaining its own parsing/streaming/tracing layer and consumes the SDK's, which now carries the AGX1-377/378 fixes and cross-channel conformance. + +This is the payoff of the SDK harness-surface workstream (PRs: foundation #412, conformance #414, pydantic #415, openai #416, langgraph #417, claude-code #420, codex #421). The SDK *enables* the surface; this plan *consumes* it in production. + +## Current golden-agent internals (what gets replaced vs kept) + +Paths under `teams/sgp/agents/golden_agent/project/`. + +| Area | File(s) | Disposition | +|------|---------|-------------| +| Neutral event vocabulary | `harness/events.py` (`HarnessEvent`) | **Delete** β€” superseded by the SDK canonical `StreamTaskMessage*` stream. | +| Eventβ†’adk bridge | `harness/adapter.py` (`AgentexStreamAdapter`) | **Delete** β€” superseded by `UnifiedEmitter` (yield + auto_send) which drives `adk.streaming`/`adk.tracing` and derives spans. | +| Provider protocol | `harness/protocol.py` (`HarnessProvider`) | **Delete or shrink** β€” providers no longer need to emit `HarnessEvent`; they produce the CLI's stdout stream and hand it to the SDK tap. | +| claude-code parser | `harness/providers/claude.py` `_StreamJsonProcessor` | **Delete** β€” replaced by SDK `convert_claude_code_to_agentex_events` + `ClaudeCodeTurn`. Keep the sandbox/CLI-spawn parts of `ClaudeProvider`. | +| codex parser | `harness/providers/codex.py` `_CodexEventProcessor` | **Delete** β€” replaced by SDK `convert_codex_to_agentex_events` + `CodexTurn`. Keep sandbox/CLI-spawn. | +| Turn dispatch activity | `harness/activity.py` (`execute_agent_turn`) | **Simplify** β€” keep provider selection + heartbeat + metrics; replace the `providerβ†’HarnessEventβ†’adapter` loop with `tap β†’ Turn β†’ UnifiedEmitter.auto_send_turn`. | +| In-process OpenAI-Agents harness | `harness/oai_mcp.py`, `oai_hooks.py`, `oai_streaming_model.py` | **Phase 4 (optional)** β€” could adopt the SDK openai tap; trickiest, deferred. | +| Sandbox pool / setup / config / data-plane | `sandbox_pool.py`, `sandbox_setup.py`, `sandbox_config.py`, `sandbox_client_oai.py`, `pool_activities.py` | **Keep unchanged** (SGP-coupled; out of SDK scope). | +| Secret / MCP reauth | `secrets.py`, `internal-packages/sgp_secrets_client` | **Keep unchanged** (SGP/identity-service-coupled). | +| Capabilities / catalog / prompts / workflow / cron | `capabilities/*`, `prompts/*`, `workflow.py`, `cron.py`, `meta_activities.py` | **Keep unchanged.** | +| Reconnect notices | `harness/notices.py` | **Keep** β€” independent of the harness stream (a standalone task message). | + +## Target flow (per turn, inside the Temporal activity) + +The golden agent runs under Temporal, so delivery uses the **auto_send** channel from inside the activity (the SDK `UnifiedEmitter.auto_send_turn`, which already runs correctly in an activity). + +``` +# workflow side: capture the timestamp in workflow context and pass it in, +# because workflow.now() is NOT available inside an activity. +execute_agent_turn(ActivityParams(..., created_at=workflow.now())) + +execute_agent_turn (activity, receives created_at via ActivityParams): + 1. Acquire/reconnect sandbox (pool), resolve secrets, render MCP config # KEEP β€” SGP-coupled + 2. Emit sandbox-setup steps as ToolRequestContent/ToolResponseContent # KEEP β€” now agentex content + 3. Spawn `claude -p --output-format stream-json` / `codex exec` in sandbox # KEEP β€” CLI spawn + 4. turn = ClaudeCodeTurn(chain(setup_events, convert_claude_code_to_agentex_events(sandbox.stdout_lines))) # NEW β€” SDK tap + Turn + 5. result = await UnifiedEmitter(task_id, trace_id, parent_span_id)\ + .auto_send_turn(turn, created_at=params.created_at) # NEW β€” SDK delivery + 6. emit per-turn metrics from result.usage (TurnUsage) # KEEP β€” DogStatsD, now fed by TurnUsage +``` + +### Sandbox-setup event interleaving +Today the provider yields sandbox provisioning steps (reconnect / find / create / configure-git / clone) as `ToolStarted`/`ToolCompleted` `HarnessEvent`s that flow through the adapter so they appear in the UI + trace. Under the unified surface these become agent-produced `ToolRequestContent`/`ToolResponseContent` `StreamTaskMessage*` messages, **chained before** the harness tap's stream into one canonical stream for the turn (`chain(setup_events, convert_claude_code_to_agentex_events(stdout))`). `UnifiedEmitter` then delivers and traces the whole turn uniformly β€” setup steps keep showing in the UI and span tree. + +### Determinism / timestamps +Capture the timestamp in the **workflow** with `workflow.now()` and pass it into `execute_agent_turn` as an activity parameter (`created_at`); the activity forwards `params.created_at` to `auto_send_turn` (AGX1-378) for deterministic Temporal timestamps. Note this is NOT identical to the prior dispenser: `auto_send_turn` applies the same `created_at` to every message it opens in the turn (setup, tool, text), whereas the old dispenser stamped only the first agent message and let later messages take server time. A single per-turn timestamp can collapse the relative ordering of streamed UI messages and trace spans within a turn, so if intra-turn ordering matters, leave `created_at=None` (server time) or stamp per message. Do NOT call `workflow.now()` inside the activity (it is only valid in workflow context and raises otherwise). + +### Usage / metrics +`auto_send_turn` returns a `TurnResult` with a normalized `TurnUsage`. The golden agent's per-turn DogStatsD metrics (`metrics.py`) read from `TurnUsage` instead of the old `TurnCompleted` event β€” one shape for traces + metrics. + +## Phases + +**Phase 0 β€” Prereqs (no golden-agent code yet)** +- SDK stack merges to `next`/main and a version is released (or pin a pre-release). +- Confirm the public import path (AGX1-375): import the surface from the public `adk.*` facade if available, else `agentex.lib.core.harness` / `adk._modules`. +- Bump the golden agent's `scale-agentex` dependency to that version. Watch the `uv.lock` churn (commit it deliberately; see the team's lint-before-push rule). + +**Phase 1 β€” claude-code provider** +- In `ClaudeProvider`, keep sandbox acquisition, secret/MCP injection, and the `claude -p` spawn. Replace the `_StreamJsonProcessor` + `HarnessEvent` yielding with: produce the CLI's stdout as an async line iterator and wrap it in `ClaudeCodeTurn`. +- In `execute_agent_turn`, replace the `AgentexStreamAdapter` loop with `UnifiedEmitter(...).auto_send_turn(turn, created_at=params.created_at)` (the calling workflow passes `workflow.now()` into the activity params; see "Determinism / timestamps"); chain the sandbox-setup content before the tap stream. +- Delete `_StreamJsonProcessor`. +- Verify against the golden agent's existing turn tests + a live claude-code turn in a dev sandbox: UI streaming, tool spans, reasoning, usage/metrics all intact. + +**Phase 2 β€” codex provider** β€” same as Phase 1 with `convert_codex_to_agentex_events` + `CodexTurn`; delete `_CodexEventProcessor`. + +**Phase 3 β€” retire the bespoke harness layer** β€” delete `harness/events.py`, `harness/adapter.py`, and shrink/delete `harness/protocol.py`; simplify `harness/activity.py` to the tapβ†’Turnβ†’emitter shape. Confirm no remaining imports of the deleted symbols. + +**Phase 4 (optional, later) β€” in-process OpenAI-Agents (litellm) harness** β€” adopt the SDK openai tap (`convert_openai_to_agentex_events`, wrapped in a `HarnessTurn`; `OpenAITurn` lands in the openai migration PR) + `UnifiedEmitter` in place of `oai_streaming_model.py`/`oai_hooks.py`. Note `run_agent_streamed_auto_send` is the existing provider auto-send wrapper, not a tap/Turn replacement, so it is not the migration target here. This path runs the OpenAI Agents SDK in-process inside the workflow and is the most coupled to Temporal context; treat as a separate, carefully-scoped follow-up. + +## Testing +- The SDK's cross-channel conformance (#414) + per-harness fixtures already prove the taps produce correct, channel-equivalent streams + spans + usage. The golden agent inherits that confidence by consuming them. +- Golden-agent side: keep its existing turn/integration tests; add a dev-sandbox live smoke per provider (claude-code, codex) asserting streamed text + tool request/response + reasoning + a well-formed span tree + non-zero `TurnUsage`. +- Regression watch: UI message shapes (text/reasoning/tool), span nesting, and per-turn metrics must match pre-migration behavior. + +## What stays SGP-side permanently (never moves to the SDK) +Per the original layering analysis: the sandbox pool + acquire modes, sandbox setup orchestration, `_override_data_plane` (ARP in-cluster routing), the sgp-secrets client, and user-scoped secret resolution / OAuth MCP reauth. The reauth refresh ideally migrates **identity-service-side** over time (so the agent becomes a dumb consumer of live tokens); the SDK only needs a generic "credential expired β†’ emit reconnect notice" hook, not the sgp-secrets contract. + +## Risks / watch-items +- **Wire-shape parity:** the SDK `auto_send` delivers `Full` tool messages as open+close and streamed tool requests as `Start+Delta+Done` (both deliver equivalent content; AGX1-377 fixed the previously-dropped streamed shape). Validate the UI renders tool request/response identically to today. +- **Reasoning/thinking mapping:** confirm claude-code `thinking` blocks and codex reasoning map to `ReasoningContent` the same way the old adapter did (the SDK taps were ported from these exact processors, so parity is expected β€” verify in a live turn). +- **Heartbeat/timeout:** keep `execute_agent_turn`'s heartbeat pulse around the (now SDK-driven) consumption loop; long CLI turns must still heartbeat. +- **uv.lock / dependency bump:** pin the SDK version explicitly; deliberate lock commit. + +## Sequencing summary +SDK PRs merge + release β†’ bump golden agent dependency (Phase 0) β†’ claude-code (Phase 1) β†’ codex (Phase 2) β†’ delete bespoke harness layer (Phase 3) β†’ optional in-process OpenAI-Agents adoption (Phase 4). Each phase is independently shippable and reversible (the deleted code is recoverable from history until Phase 3 lands). diff --git a/src/agentex/lib/core/harness/__init__.py b/src/agentex/lib/core/harness/__init__.py new file mode 100644 index 000000000..067751d63 --- /dev/null +++ b/src/agentex/lib/core/harness/__init__.py @@ -0,0 +1,30 @@ +"""Shared, harness-independent machinery for the unified harness surface. + +The Agentex StreamTaskMessage* stream is the single source of truth; this +package derives spans from it and delivers it (yield or auto-send), so every +harness tap gets streaming + tracing + turn usage uniformly. +""" + +from agentex.lib.core.harness.types import ( + OpenSpan, + CloseSpan, + TurnUsage, + SpanSignal, + TurnResult, + HarnessTurn, + StreamTaskMessage, +) +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter + +__all__ = [ + "UnifiedEmitter", + "SpanTracer", + "OpenSpan", + "CloseSpan", + "SpanSignal", + "StreamTaskMessage", + "TurnUsage", + "TurnResult", + "HarnessTurn", +] diff --git a/src/agentex/lib/core/harness/auto_send.py b/src/agentex/lib/core/harness/auto_send.py new file mode 100644 index 000000000..2ecd6b583 --- /dev/null +++ b/src/agentex/lib/core/harness/auto_send.py @@ -0,0 +1,156 @@ +"""Auto-send delivery: canonical stream -> adk.streaming side effects + tracing.""" + +from __future__ import annotations + +from typing import Any, AsyncIterator +from datetime import datetime + +from agentex.types.text_delta import TextDelta +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import TurnUsage, TurnResult, StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.core.harness.span_derivation import SpanDeriver + +try: + from agentex.lib.utils.logging import make_logger + + logger = make_logger(__name__) +except Exception: # ddtrace may be absent in some envs; fall back to stdlib + import logging + + logger = logging.getLogger(__name__) + + +async def auto_send( + events: AsyncIterator[StreamTaskMessage], + task_id: str, + tracer: SpanTracer | None = None, + streaming: Any = None, + usage: TurnUsage | None = None, + created_at: datetime | None = None, +) -> TurnResult: + """Push the canonical stream to the task stream via adk.streaming. + + Opens a streaming context per message (keyed by index), streams deltas via + ctx.stream_update, and closes via ctx.close() on Done. Posts tool + request/response full messages by opening a context with the content and + closing it immediately (no deltas). Derives and traces spans from the same + stream. Returns the last text segment's text + usage. + + Index-keyed routing: each Start(index=i) opens a context stored in + ctx_map[i]; Delta(index=i) routes to ctx_map.get(i); Done(index=i) closes + and removes ctx_map[i]. Events with index is None are skipped. The finally + block closes all remaining open contexts. + + final_text last-segment semantics: a new Start(TextContent) resets + final_text_parts so that multi-step turns return the LAST text segment. + Full(TextContent) also overwrites final_text_parts (same semantics). + + AGX1-378: created_at is forwarded to every streaming_task_message_context + call so callers can back-date message timestamps. + + Mirrors the open/close/stream_update pattern from + src/agentex/lib/adk/_modules/_langgraph_async.py: + - context opened via streaming_task_message_context(...).__aenter__() + - context closed via ctx.close() (not __aexit__) + - deltas pushed as StreamTaskMessageDelta with parent_task_message set + from ctx.task_message + + For async + temporal agents (call from inside an activity). + """ + if streaming is None: + from agentex.lib import adk + + streaming = adk.streaming + + deriver = SpanDeriver() if tracer is not None else None + final_text_parts: list[str] = [] + ctx_map: dict[int, Any] = {} + + async def _close_all() -> None: + # Guard each close independently: a failure on one context (e.g. a + # backend hiccup during teardown) must not abandon the remaining open + # contexts, otherwise their task messages would never be finalized. + for ctx in list(ctx_map.values()): + try: + await ctx.close() + except Exception as exc: + logger.warning("[harness.auto_send] context close failed during teardown: %s", exc) + ctx_map.clear() + + try: + async for event in events: + if deriver is not None and tracer is not None: + for signal in deriver.observe(event): + await tracer.handle(signal) + + if isinstance(event, StreamTaskMessageStart): + if event.index is None: + continue + i = event.index + # Reset final_text_parts when a new text segment starts + if isinstance(event.content, TextContent): + final_text_parts = [] + ctx = streaming.streaming_task_message_context( + task_id=task_id, + initial_content=event.content, + created_at=created_at, + ) + ctx_map[i] = await ctx.__aenter__() + + elif isinstance(event, StreamTaskMessageDelta): + if event.index is None: + continue + ctx = ctx_map.get(event.index) + if ctx is not None and event.delta is not None: + # Reconstruct the delta with parent_task_message set from + # the context's task_message (mirrors _langgraph_async.py + # lines 72-78 and 117-127). + delta_with_parent = StreamTaskMessageDelta( + parent_task_message=ctx.task_message, + delta=event.delta, + type="delta", + index=event.index, + ) + await ctx.stream_update(delta_with_parent) + if isinstance(event.delta, TextDelta) and event.delta.text_delta: + final_text_parts.append(event.delta.text_delta) + + elif isinstance(event, StreamTaskMessageDone): + if event.index is None: + continue + ctx = ctx_map.pop(event.index, None) + if ctx is not None: + await ctx.close() + + elif isinstance(event, StreamTaskMessageFull): + # Full messages: post the full message by opening a context + # with the content and closing it immediately (no deltas; + # StreamingTaskMessageContext.close() persists initial_content + # when the accumulator is empty). Use async with so the context + # is closed even if close() raises (__aexit__ delegates to + # close()). + # Full(TextContent) also resets final_text_parts for + # last-segment semantics. + if isinstance(event.content, TextContent): + final_text_parts = [event.content.content] + async with streaming.streaming_task_message_context( + task_id=task_id, + initial_content=event.content, + created_at=created_at, + ): + pass + + finally: + await _close_all() + if deriver is not None and tracer is not None: + for signal in deriver.flush(): + await tracer.handle(signal) + + return TurnResult(final_text="".join(final_text_parts), usage=usage or TurnUsage()) diff --git a/src/agentex/lib/core/harness/emitter.py b/src/agentex/lib/core/harness/emitter.py new file mode 100644 index 000000000..5b56793bf --- /dev/null +++ b/src/agentex/lib/core/harness/emitter.py @@ -0,0 +1,80 @@ +"""UnifiedEmitter: the single facade agent authors use for either delivery mode.""" + +from __future__ import annotations + +from typing import AsyncGenerator +from datetime import datetime + +from agentex.lib.core.harness.types import TurnResult, HarnessTurn, StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.auto_send import auto_send +from agentex.lib.core.harness.yield_delivery import yield_events + + +class UnifiedEmitter: + """Ties trace context + chosen delivery together. + + Tracing modes (the `tracer` arg): + - tracer=None (default): auto-construct a SpanTracer if `trace_id` is present. + - tracer=False: disable tracing entirely, regardless of `trace_id`. + - tracer=: use the supplied instance. + + `tracing` and `streaming` are injection escape-hatches for tests/advanced + use; leave them None in production so the real adk modules are used. + """ + + tracer: SpanTracer | None + + def __init__( + self, + task_id: str, + trace_id: str | None, + parent_span_id: str | None, + tracer: SpanTracer | bool | None = None, + tracing: object | None = None, + streaming: object | None = None, + ): + self.task_id = task_id + self.trace_id = trace_id + self.parent_span_id = parent_span_id + self._streaming = streaming + if tracer is False: + self.tracer = None + elif isinstance(tracer, SpanTracer): + self.tracer = tracer + elif trace_id: + self.tracer = SpanTracer( + trace_id=trace_id, + parent_span_id=parent_span_id, + task_id=task_id, + tracing=tracing, + ) + else: + self.tracer = None + + async def yield_turn(self, turn: HarnessTurn) -> AsyncGenerator[StreamTaskMessage, None]: + """Sync HTTP ACP delivery: forward events, trace as side effect.""" + async for event in yield_events(turn.events, tracer=self.tracer): + yield event + + async def auto_send_turn(self, turn: HarnessTurn, created_at: datetime | None = None) -> TurnResult: + """Async/temporal delivery: push to the task stream, return TurnResult. + + Pass `created_at` (e.g. `workflow.now()` under Temporal) to stamp the + turn's messages with a deterministic timestamp; it is forwarded to the + streaming contexts. Default None preserves server-side timestamps. + """ + # `turn.usage()` is only valid AFTER `turn.events` is exhausted (the + # HarnessTurn single-pass contract: real turns populate usage while the + # stream is consumed). So drive delivery first, then read usage β€” do NOT + # pass `usage=turn.usage()` eagerly here (that would capture the empty + # default before the stream runs). + result = await auto_send( + turn.events, + task_id=self.task_id, + tracer=self.tracer, + streaming=self._streaming, + created_at=created_at, + ) + result.usage = turn.usage() + return result diff --git a/src/agentex/lib/core/harness/span_derivation.py b/src/agentex/lib/core/harness/span_derivation.py new file mode 100644 index 000000000..cecb24bcc --- /dev/null +++ b/src/agentex/lib/core/harness/span_derivation.py @@ -0,0 +1,154 @@ +"""Pure reducer: canonical StreamTaskMessage* stream -> span open/close signals. + +Has no dependency on adk; unit-testable in isolation. Delivery adapters feed it +every event and act on the returned signals. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass + +from agentex.lib.core.harness.types import OpenSpan, CloseSpan, SpanSignal, StreamTaskMessage +from agentex.types.tool_request_delta import ToolRequestDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + + +@dataclass +class _ToolReqMeta: + tool_call_id: str + name: str + arguments: dict[str, object] + args_buf: str = "" # accumulated streamed argument fragments + + +class SpanDeriver: + """Stateful reducer over the canonical stream. + + Tool span: open on Done of a ToolRequestContent index; close on matching + ToolResponseContent by tool_call_id. Reasoning span: open on + Start(ReasoningContent); close on that index's Done. + + Deliberate contracts: + - A `Full(ToolResponseContent)` whose tool_call_id was never opened is + ignored (no CloseSpan emitted). + - A `Done` for an index that was never a tool_request/reasoning Start is + ignored (no signal emitted). + - Events with `index is None` are skipped entirely; without a stable index + they cannot be reliably paired, and aliasing them to a sentinel would + let unrelated None-indexed events cross-match. + - `flush()` closes anything still open as incomplete; unclosed tool spans + are emitted in the order they were opened. + """ + + def __init__(self) -> None: + self._tool_by_index: dict[int, _ToolReqMeta] = {} + self._reasoning_index_open: set[int] = set() + # insertion-ordered set of open tool_call_ids (dict keys preserve order) + self._open_tool_ids: dict[str, None] = {} + + def observe(self, event: StreamTaskMessage) -> list[SpanSignal]: + if isinstance(event, StreamTaskMessageStart): + return self._on_start(event) + if isinstance(event, StreamTaskMessageDelta): + return self._on_delta(event) + if isinstance(event, StreamTaskMessageFull): + return self._on_full(event) + if isinstance(event, StreamTaskMessageDone): + return self._on_done(event) + return [] + + def flush(self) -> list[SpanSignal]: + """Close anything still open at end of stream, marked incomplete.""" + signals: list[SpanSignal] = [] + for tcid in list(self._open_tool_ids): + signals.append(CloseSpan(key=tcid, output=None, is_complete=False)) + self._open_tool_ids.clear() + for idx in sorted(self._reasoning_index_open): + signals.append(CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=False)) + self._reasoning_index_open.clear() + return signals + + def _on_start(self, event: StreamTaskMessageStart) -> list[SpanSignal]: + if event.index is None: + return [] + idx = event.index + content = event.content + if isinstance(content, ToolRequestContent): + self._tool_by_index[idx] = _ToolReqMeta( + tool_call_id=content.tool_call_id, + name=content.name, + arguments=dict(content.arguments or {}), + ) + return [] + if content.type == "reasoning": + self._reasoning_index_open.add(idx) + return [OpenSpan(key=f"reasoning:{idx}", kind="reasoning", name="reasoning", input={})] + return [] + + def _on_delta(self, event: StreamTaskMessageDelta) -> list[SpanSignal]: + if event.index is None: + return [] + idx = event.index + delta = event.delta + if isinstance(delta, ToolRequestDelta): + meta = self._tool_by_index.get(idx) + if meta is not None and delta.arguments_delta: + meta.args_buf += delta.arguments_delta + return [] + + def _on_full(self, event: StreamTaskMessageFull) -> list[SpanSignal]: + """Handle a Full event. + + A `Full(ToolRequestContent)` opens a tool span (keyed by tool_call_id) + if it is not already open; the matching `Full(ToolResponseContent)` + closes it. This handles harnesses (e.g. LangGraph) that emit tool calls + as a single Full rather than Start+Done. + """ + content = event.content + if isinstance(content, ToolRequestContent): + tcid = content.tool_call_id + if tcid not in self._open_tool_ids: + self._open_tool_ids[tcid] = None + args = dict(content.arguments or {}) + return [OpenSpan(key=tcid, kind="tool", name=content.name, input=args)] + return [] + if isinstance(content, ToolResponseContent): + tcid = content.tool_call_id + if tcid in self._open_tool_ids: + self._open_tool_ids.pop(tcid, None) + return [ + CloseSpan( + key=tcid, + output=content.content, + is_complete=True, + is_error=content.is_error, + ) + ] + return [] + + def _on_done(self, event: StreamTaskMessageDone) -> list[SpanSignal]: + if event.index is None: + return [] + idx = event.index + meta = self._tool_by_index.pop(idx, None) + if meta is not None: + args = meta.arguments + if meta.args_buf: + try: + args = json.loads(meta.args_buf) + except json.JSONDecodeError: + args = {"_raw": meta.args_buf} + self._open_tool_ids[meta.tool_call_id] = None + return [OpenSpan(key=meta.tool_call_id, kind="tool", name=meta.name, input=args)] + if idx in self._reasoning_index_open: + self._reasoning_index_open.discard(idx) + return [CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=True)] + return [] diff --git a/src/agentex/lib/core/harness/tracer.py b/src/agentex/lib/core/harness/tracer.py new file mode 100644 index 000000000..4ca4d628b --- /dev/null +++ b/src/agentex/lib/core/harness/tracer.py @@ -0,0 +1,88 @@ +"""Adapter from SpanSignals to adk.tracing spans (best-effort, overridable).""" + +from __future__ import annotations + +from typing import Any + +from agentex.lib.core.harness.types import OpenSpan, CloseSpan, SpanSignal + +try: + from agentex.lib.utils.logging import make_logger + + logger = make_logger(__name__) +except Exception: # ddtrace may be absent in some envs; fall back to stdlib + import logging + + logger = logging.getLogger(__name__) + + +class SpanTracer: + """Opens/closes adk.tracing child spans in response to span signals. + + `tracing` defaults to the real `adk.tracing` module; inject a fake in tests + or a custom tracer to override. No-op when `trace_id` is falsy. Never raises. + + The real TracingModule.end_span does NOT accept an output kwarg β€” output is + recorded by mutating span.output before calling end_span, matching the pattern + used throughout the codebase (see _langgraph_tracing.py on_tool_end etc.). + + Span-lifecycle contract: the `_open` dict (span key -> span object) is scoped + to a single turn. Pairing is by `key`: + - A duplicate OpenSpan for a key already in `_open` silently replaces the + earlier span; the earlier span is then orphaned (never closed / leaked). + - A CloseSpan for an unknown key is a no-op. + - Unpaired opens accumulate in `_open` for the lifetime of the tracer; since + a tracer is expected to live for one turn, this is bounded and acceptable. + """ + + def __init__( + self, + trace_id: str | None, + parent_span_id: str | None, + tracing: Any = None, + task_id: str | None = None, + ): + self.trace_id = trace_id + self.parent_span_id = parent_span_id + self.task_id = task_id + if tracing is None: + from agentex.lib import adk + + tracing = adk.tracing + self._tracing = tracing + self._open: dict[str, Any] = {} # span key -> span object + + async def handle(self, signal: SpanSignal) -> None: + if not self.trace_id: + return + try: + if isinstance(signal, OpenSpan): + span = await self._tracing.start_span( + trace_id=self.trace_id, + name=signal.name, + input=signal.input, + parent_id=self.parent_span_id, + task_id=self.task_id, + ) + if span is not None: + self._open[signal.key] = span + elif isinstance(signal, CloseSpan): + span = self._open.pop(signal.key, None) + if span is not None: + # Output is recorded by mutating span.output before end_span. + # The real TracingModule.end_span signature is: + # end_span(trace_id, span, start_to_close_timeout, heartbeat_timeout, retry_policy) + # It does not accept an output= kwarg. + span.output = signal.output + # Tool failure status (ToolResponseContent.is_error) is recorded + # on span.data when the harness reports one; Span has no dedicated + # error field. None means no status was reported, so leave data alone. + if signal.is_error is not None: + data = span.data if isinstance(span.data, dict) else {} + span.data = {**data, "is_error": signal.is_error} + await self._tracing.end_span( + trace_id=self.trace_id, + span=span, + ) + except Exception as exc: # best-effort: tracing never breaks delivery + logger.warning("[harness.tracer] span signal failed: %s", exc) diff --git a/src/agentex/lib/core/harness/types.py b/src/agentex/lib/core/harness/types.py new file mode 100644 index 000000000..b37dc1e51 --- /dev/null +++ b/src/agentex/lib/core/harness/types.py @@ -0,0 +1,93 @@ +"""Types for the unified harness surface.""" + +from __future__ import annotations + +from typing import Any, Union, Literal, Protocol, AsyncIterator, runtime_checkable +from dataclasses import field, dataclass + +from pydantic import BaseModel, ConfigDict + +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) + +# The canonical stream element. Taps yield these; delivery adapters consume them. +StreamTaskMessage = Union[ + StreamTaskMessageStart, + StreamTaskMessageDelta, + StreamTaskMessageFull, + StreamTaskMessageDone, +] + +SpanKind = Literal["tool", "reasoning", "subagent"] + + +@dataclass +class OpenSpan: + """Signal to open a child span. `key` pairs an open with its close.""" + + key: str + kind: SpanKind + name: str + input: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class CloseSpan: + """Signal to close the span previously opened with the same `key`.""" + + key: str + output: Any = None + is_complete: bool = True # False when closed by flush() without a result + is_error: bool | None = None # tool failure status; None when the harness reports no status + + +SpanSignal = Union[OpenSpan, CloseSpan] + + +class TurnUsage(BaseModel): + """Harness-independent turn usage/cost, attached to the turn span. + + Token field names align with agentex.lib.core.observability.llm_metrics. + """ + + model_config = ConfigDict(from_attributes=True, populate_by_name=True) + + model: str | None = None + input_tokens: int | None = None + output_tokens: int | None = None + cached_input_tokens: int | None = None + reasoning_tokens: int | None = None + total_tokens: int | None = None + cost_usd: float | None = None + duration_ms: int | None = None + num_llm_calls: int = 0 + num_tool_calls: int = 0 + num_reasoning_blocks: int = 0 + + +class TurnResult(BaseModel): + """Returned to the caller after a turn is delivered.""" + + model_config = ConfigDict(from_attributes=True, populate_by_name=True) + + final_text: str = "" + usage: TurnUsage = TurnUsage() + + +@runtime_checkable +class HarnessTurn(Protocol): + """A single harness turn: a canonical stream plus its normalized usage. + + Python async generators cannot cleanly return a value to their consumer, so + a tap exposes usage via `usage()` (valid only after `events` is exhausted) + rather than via StopAsyncIteration. + """ + + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: ... + + def usage(self) -> TurnUsage: ... diff --git a/src/agentex/lib/core/harness/yield_delivery.py b/src/agentex/lib/core/harness/yield_delivery.py new file mode 100644 index 000000000..69b39f152 --- /dev/null +++ b/src/agentex/lib/core/harness/yield_delivery.py @@ -0,0 +1,31 @@ +"""Yield delivery: pass the canonical stream through, tracing as a side effect.""" + +from __future__ import annotations + +from typing import AsyncIterator, AsyncGenerator + +from agentex.lib.core.harness.types import StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.span_derivation import SpanDeriver + + +async def yield_events( + events: AsyncIterator[StreamTaskMessage], + tracer: SpanTracer | None = None, +) -> AsyncGenerator[StreamTaskMessage, None]: + """Forward each event to the caller; derive + trace spans as a side effect. + + For sync HTTP ACP agents that yield events back over the response. When + `tracer` is None, this is a pure passthrough. + """ + deriver = SpanDeriver() if tracer is not None else None + try: + async for event in events: + if deriver is not None and tracer is not None: + for signal in deriver.observe(event): + await tracer.handle(signal) + yield event + finally: + if deriver is not None and tracer is not None: + for signal in deriver.flush(): + await tracer.handle(signal) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 7ccc6627a..75dc0f053 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -1,6 +1,7 @@ """Custom Temporal Model Provider with streaming support for OpenAI agents.""" from __future__ import annotations +import json import time import uuid from typing import Any, List, Union, Optional, override @@ -63,9 +64,9 @@ from agentex.lib import adk from agentex.lib.utils.logging import make_logger from agentex.lib.core.tracing.tracer import AsyncTracer -from agentex.types.task_message_delta import TextDelta, ReasoningContentDelta, ReasoningSummaryDelta +from agentex.types.task_message_delta import TextDelta, ToolRequestDelta, ReasoningContentDelta, ReasoningSummaryDelta from agentex.types.task_message_update import StreamTaskMessageFull, StreamTaskMessageDelta -from agentex.types.task_message_content import TextContent, ReasoningContent +from agentex.types.task_message_content import TextContent, ReasoningContent, ToolRequestContent from agentex.lib.adk.utils._modules.client import create_async_agentex_client from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ( streaming_task_id, @@ -722,12 +723,27 @@ async def get_response( streaming_mode=self.streaming_mode, ).__aenter__() elif item and getattr(item, 'type', None) == 'function_call': - # Track the function call being streamed + # Open a streaming context per function call so argument + # deltas can be published incrementally. Coalescing and + # mode dispatch are handled by the streaming layer. + call_id = getattr(item, 'call_id', '') + tool_name = getattr(item, 'name', '') + call_context = await adk.streaming.streaming_task_message_context( + task_id=task_id, + initial_content=ToolRequestContent( + author="agent", + tool_call_id=call_id, + name=tool_name, + arguments={}, + ), + streaming_mode=self.streaming_mode, + ).__aenter__() function_calls_in_progress[output_index] = { 'id': getattr(item, 'id', ''), - 'call_id': getattr(item, 'call_id', ''), - 'name': getattr(item, 'name', ''), + 'call_id': call_id, + 'name': tool_name, 'arguments': getattr(item, 'arguments', ''), + 'context': call_context, } logger.debug(f"[TemporalStreamingModel] Starting function call: {item.name}") @@ -748,8 +764,24 @@ async def get_response( output_index = getattr(event, 'output_index', 0) delta = getattr(event, 'delta', '') - if output_index in function_calls_in_progress: - function_calls_in_progress[output_index]['arguments'] += delta + call_data = function_calls_in_progress.get(output_index) + if call_data is not None: + call_data['arguments'] += delta + call_context = call_data.get('context') + if call_context is not None: + try: + await call_context.stream_update(StreamTaskMessageDelta( + parent_task_message=call_context.task_message, + delta=ToolRequestDelta( + tool_call_id=call_data['call_id'], + name=call_data['name'], + arguments_delta=delta, + type="tool_request", + ), + type="delta", + )) + except Exception as e: + logger.warning(f"Failed to send tool request delta: {e}") logger.debug(f"[TemporalStreamingModel] Function call args delta: {delta[:50]}...") elif isinstance(event, ResponseFunctionCallArgumentsDoneEvent): @@ -874,6 +906,42 @@ async def get_response( ) output_items.append(tool_call) + # Emit the final ToolRequestContent and close the + # per-call streaming context. If the model produced + # invalid JSON args (truncation, hallucination), fall + # back to an empty dict so the streaming layer can + # still persist a message. + call_context = call_data.get('context') + if call_context is not None: + raw_args = call_data['arguments'] or '' + try: + parsed_args = json.loads(raw_args) if raw_args else {} + except json.JSONDecodeError: + logger.warning( + f"Failed to parse tool call arguments for {call_data['name']} " + f"(raw_args_bytes={len(raw_args)})" + ) + parsed_args = {} + try: + await call_context.stream_update(StreamTaskMessageFull( + parent_task_message=call_context.task_message, + content=ToolRequestContent( + author="agent", + tool_call_id=call_data['call_id'], + name=call_data['name'], + arguments=parsed_args, + ), + type="full", + )) + except Exception as e: + logger.warning(f"Failed to send tool request full update: {e}") + try: + await call_context.close() + except Exception as e: + logger.warning(f"Failed to close tool request context: {e}") + finally: + call_data['context'] = None + elif isinstance(event, ResponseReasoningSummaryPartAddedEvent): # New reasoning part/summary started - reset accumulator part = getattr(event, 'part', None) @@ -907,6 +975,17 @@ async def get_response( await streaming_context.close() streaming_context = None + # Defensive: close any function call contexts that didn't see a + # ResponseOutputItemDoneEvent (truncated stream, error mid-call). + for call_data in function_calls_in_progress.values(): + call_context = call_data.get('context') + if call_context is not None: + try: + await call_context.close() + except Exception as e: + logger.warning(f"Failed to close orphaned tool request context: {e}") + call_data['context'] = None + # Build the response from output items collected during streaming # Create output from the items we collected response_output = [] diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py index 97dda0e61..26c0b7c4b 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py @@ -12,8 +12,11 @@ from openai.types.responses import ( ResponseCompletedEvent, ResponseTextDeltaEvent, + ResponseOutputItemDoneEvent, ResponseOutputItemAddedEvent, + ResponseFunctionCallArgumentsDoneEvent, ResponseReasoningSummaryTextDeltaEvent, + ResponseFunctionCallArgumentsDeltaEvent, ) @@ -851,6 +854,197 @@ async def test_missing_task_id_error(self, streaming_model): ) +class TestStreamingModelFunctionCallArgsStreaming: + """Verify ``ResponseFunctionCallArgumentsDeltaEvent``s are surfaced as + ``ToolRequestDelta`` updates and that a final ``ToolRequestContent`` Full is + emitted on ``ResponseOutputItemDoneEvent``. + + Without this, write-heavy tools (``write_file``, ``apply_patch``) buffer their + entire argument body inside ``invoke_model_activity`` and the UI sees a + multi-second freeze while the model is actively producing tokens. + """ + + @staticmethod + def _build_function_call_stream(arguments_text: str): + """Construct a streaming event sequence for a single function_call. + + Mirrors the production order: Added β†’ N Γ— ArgumentsDelta β†’ ArgumentsDone + β†’ OutputItemDone β†’ ResponseCompleted. ``spec=...`` makes ``isinstance`` + dispatch in production work without triggering pydantic validation. + """ + call_item = MagicMock() + call_item.type = "function_call" + call_item.id = "fc_abc" + call_item.call_id = "call_abc" + call_item.name = "write_file" + call_item.arguments = "" + + item_added = MagicMock(spec=ResponseOutputItemAddedEvent) + item_added.item = call_item + item_added.output_index = 0 + + # Split the argument text into a few chunks to exercise the per-delta loop + chunk_size = max(1, len(arguments_text) // 3) if arguments_text else 1 + chunks = [arguments_text[i:i + chunk_size] for i in range(0, len(arguments_text), chunk_size)] or [""] + delta_events = [] + for chunk in chunks: + ev = MagicMock(spec=ResponseFunctionCallArgumentsDeltaEvent) + ev.delta = chunk + ev.output_index = 0 + delta_events.append(ev) + + args_done = MagicMock(spec=ResponseFunctionCallArgumentsDoneEvent) + args_done.arguments = arguments_text + args_done.output_index = 0 + + item_done = MagicMock(spec=ResponseOutputItemDoneEvent) + item_done.item = call_item + item_done.output_index = 0 + + completed = MagicMock(spec=ResponseCompletedEvent) + completed.response = MagicMock(output=[], usage=MagicMock(), id=None) + + return [item_added, *delta_events, args_done, item_done, completed], chunks + + @staticmethod + def _install_real_task_message(mock_adk_streaming, task_id: str): + """Replace the autouse fixture's MagicMock ``task_message`` with a real + ``TaskMessage`` so production's ``StreamTaskMessageDelta(parent_task_message=...)`` + construction passes pydantic validation. The default mock works for tests + that only assert on the context's ``__aenter__`` call but breaks tests + that exercise ``stream_update`` end-to-end. + """ + from agentex.types.task_message import TaskMessage + from agentex.types.task_message_content import ToolRequestContent + + ctx = mock_adk_streaming.streaming_task_message_context.return_value + ctx.task_message = TaskMessage( + id="msg_test", + task_id=task_id, + content=ToolRequestContent( + author="agent", + tool_call_id="call_abc", + name="write_file", + arguments={}, + ), + streaming_status="IN_PROGRESS", + ) + return ctx + + @pytest.mark.asyncio + async def test_function_call_emits_argument_deltas_and_final_full( + self, streaming_model, mock_adk_streaming, _streaming_context_vars, sample_task_id + ): + """A function_call with well-formed JSON args should produce: + (1) one streaming context opened with ``ToolRequestContent`` initial_content, + (2) one ``StreamTaskMessageDelta`` per ``ArgumentsDelta`` event carrying a + ``ToolRequestDelta`` with the right ``tool_call_id`` and ``arguments_delta``, + (3) one final ``StreamTaskMessageFull`` with ``ToolRequestContent`` whose + ``arguments`` is the parsed JSON dict. + """ + from agentex.types.task_message_delta import ToolRequestDelta + from agentex.types.task_message_update import StreamTaskMessageFull, StreamTaskMessageDelta + from agentex.types.task_message_content import ToolRequestContent + + ctx = self._install_real_task_message(mock_adk_streaming, sample_task_id) + + args_text = '{"path": "/tmp/foo.txt", "contents": "hello world"}' + events, chunks = self._build_function_call_stream(args_text) + + mock_stream = AsyncMock() + mock_stream.__aiter__.return_value = iter(events) + streaming_model.client.responses.create = AsyncMock(return_value=mock_stream) + + await streaming_model.get_response( + system_instructions=None, + input="please write foo", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + # 1. A streaming context was opened with ToolRequestContent. + opens = [ + c for c in mock_adk_streaming.streaming_task_message_context.call_args_list + if isinstance(c.kwargs.get("initial_content"), ToolRequestContent) + ] + assert len(opens) == 1, f"expected one ToolRequest context, got {len(opens)}" + initial = opens[0].kwargs["initial_content"] + assert initial.tool_call_id == "call_abc" + assert initial.name == "write_file" + + # 2. One StreamTaskMessageDelta(ToolRequestDelta) was streamed per + # ArgumentsDelta event, preserving the delta text exactly. + delta_updates = [ + call.args[0] if call.args else call.kwargs.get("update") + for call in ctx.stream_update.call_args_list + if (call.args and isinstance(call.args[0], StreamTaskMessageDelta) + and isinstance(call.args[0].delta, ToolRequestDelta)) + ] + assert len(delta_updates) == len(chunks) + for update, expected_chunk in zip(delta_updates, chunks): + assert update.delta.tool_call_id == "call_abc" + assert update.delta.name == "write_file" + assert update.delta.arguments_delta == expected_chunk + + # 3. A final StreamTaskMessageFull(ToolRequestContent) was streamed with + # parsed args. + full_updates = [ + call.args[0] if call.args else call.kwargs.get("update") + for call in ctx.stream_update.call_args_list + if (call.args and isinstance(call.args[0], StreamTaskMessageFull) + and isinstance(call.args[0].content, ToolRequestContent)) + ] + assert len(full_updates) == 1 + final = full_updates[0].content + assert final.tool_call_id == "call_abc" + assert final.name == "write_file" + assert final.arguments == {"path": "/tmp/foo.txt", "contents": "hello world"} + + @pytest.mark.asyncio + async def test_function_call_malformed_args_fall_back_to_empty_dict( + self, streaming_model, mock_adk_streaming, _streaming_context_vars, sample_task_id, caplog + ): + """If the model produces invalid JSON for the args, the final + ``ToolRequestContent`` should carry ``arguments={}`` and a warning should + be logged. The raw delta stream is preserved either way. + """ + from agentex.types.task_message_update import StreamTaskMessageFull + from agentex.types.task_message_content import ToolRequestContent + + ctx = self._install_real_task_message(mock_adk_streaming, sample_task_id) + + # Missing closing brace β€” invalid JSON. + events, _ = self._build_function_call_stream('{"path": "/tmp/foo.txt", "contents":') + + mock_stream = AsyncMock() + mock_stream.__aiter__.return_value = iter(events) + streaming_model.client.responses.create = AsyncMock(return_value=mock_stream) + + with caplog.at_level("WARNING"): + await streaming_model.get_response( + system_instructions=None, + input="please write foo", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + full_updates = [ + call.args[0] if call.args else call.kwargs.get("update") + for call in ctx.stream_update.call_args_list + if (call.args and isinstance(call.args[0], StreamTaskMessageFull) + and isinstance(call.args[0].content, ToolRequestContent)) + ] + assert len(full_updates) == 1 + assert full_updates[0].content.arguments == {} + assert any("Failed to parse tool call arguments" in r.getMessage() for r in caplog.records) + + class TestStreamingModelUsageResponseIdAndCacheKey: """Cover real-Usage capture, real response_id, span emission, and opt-in prompt_cache_key.""" diff --git a/tests/lib/core/harness/__init__.py b/tests/lib/core/harness/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/harness/conformance/__init__.py b/tests/lib/core/harness/conformance/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/harness/conformance/runner.py b/tests/lib/core/harness/conformance/runner.py new file mode 100644 index 000000000..84e84fa51 --- /dev/null +++ b/tests/lib/core/harness/conformance/runner.py @@ -0,0 +1,514 @@ +"""Shared conformance engine: every harness tap registers fixtures here. + +A fixture is (name, list[StreamTaskMessage]). The runner asserts two things: + +1. **Cross-channel logical equivalence**: yield_events and auto_send produce the + same *logical* sequence of delivered message contents. "Logical" means we + normalise away the streaming-envelope difference: + - yield channel delivers StreamTaskMessageFull(ToolResponseContent) verbatim. + - auto_send channel delivers the same tool-response by opening a streaming + context with the full content and closing it immediately (Start+Done on the + wire), not a Full event. + Both reduce to the same LogicalDelivery(type, identity, payload) tuple; the + conformance test compares those normalised sequences. + + `payload` carries the content that callers actually consume: + - text: initial_content.content prepended, then accumulated delta string + - reasoning: initial_content.summary joined, then accumulated delta string + - tool_request: the arguments dict (JSON-sorted), from Start content + - tool_response: the content value (str) + This catches a channel that delivers the right structural shape but corrupts, + drops, or omits initial_content (including reasoning summary) or payload. + +2. **Span signal equivalence**: each channel is driven with its own recording + tracer that captures every SpanSignal it actually receives in handle(); the + two channels' recorded signal lists must be identical. Comparing what each + channel genuinely emitted (rather than re-deriving from the events) catches a + regression where a channel skips deriver.observe() for some event type. + +Registry shared-state hazard: `_REGISTRY` is process-global. Every `test_*.py` +module that calls `register()` at import time contributes to it, so a module +that parametrizes over `all_fixtures()` will see fixtures registered by ANY +other conformance module imported earlier in the same pytest process (collection +order is not guaranteed). To stay deterministic, each future harness conformance +module should register and parametrize over its OWN fixtures (e.g. keep a +module-local list it both registers and parametrizes), rather than relying on +cross-module global accumulation via `all_fixtures()`. + +Design decision β€” Full-message handling in auto_send +---------------------------------------------------- +auto_send posts a StreamTaskMessageFull (tool_request or tool_response) by +opening a streaming context with the full content and closing it immediately, +rather than calling adk.messages.create. This open+close approach is retained +because: + - StreamingTaskMessageContext.close() persists initial_content when no deltas + have been streamed, so the message IS correctly persisted. + - It mirrors the pattern already used by the real _langgraph_async.py harness, + keeping behavioural parity. + - Switching to adk.messages.create would require an additional injectable + dependency, adding surface area for no observable benefit. +The conformance test treats this as an ACCEPTABLE envelope difference: at the +logical-content level, Full(ToolResponseContent) from yield and +Start(content)+Done from auto_send are equivalent. The recorded span signals are +identical because both adapters drive the same SpanDeriver.observe() call +sequence and forward every signal to their tracer. + +AGX1-377 fix: auto_send now DELIVERS streamed tool-request messages (Start+Done) +instead of dropping them. The conformance normaliser previously suppressed the +delivery for Start(tool_request)+Done on the yield channel to match auto_send's +old drop behaviour. That suppression is now removed: both channels produce a +LogicalDelivery for a streamed tool_request, and the cross-channel assertion +verifies it is delivered on both. +""" + +from __future__ import annotations + +import json +import types as _types +from typing import Any, NamedTuple, override +from dataclasses import dataclass + +from agentex.types.text_delta import TextDelta +from agentex.types.task_message import TaskMessage +from agentex.lib.core.harness.types import SpanSignal, StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.lib.core.harness.span_derivation import SpanDeriver + + +@dataclass +class Fixture: + name: str + events: list[StreamTaskMessage] + + +_REGISTRY: list[Fixture] = [] + + +def register(fixture: Fixture) -> None: + _REGISTRY.append(fixture) + + +def all_fixtures() -> list[Fixture]: + return list(_REGISTRY) + + +def derive_all(events: list[StreamTaskMessage]) -> list[SpanSignal]: + d = SpanDeriver() + out: list[SpanSignal] = [] + for e in events: + out.extend(d.observe(e)) + out.extend(d.flush()) + return out + + +# --------------------------------------------------------------------------- +# Logical delivery normalisation +# --------------------------------------------------------------------------- + + +class LogicalDelivery(NamedTuple): + """A single logically-delivered message, channel-agnostic. + + `content_type` is the .type of the content (e.g. "text", "reasoning", + "tool_request", "tool_response"). `identity` is a frozenset of key=value + pairs that uniquely identify the content (e.g. tool_call_id for tool + messages, or index for text/reasoning). `payload` is a stable string + representation of the content callers actually consume: + - text: initial_content.content prepended to accumulated delta strings + - reasoning: initial_content.summary joined, prepended to accumulated + reasoning-content delta strings + - tool_request: JSON-sorted arguments from Start content + - tool_response: str(content) from Full event + """ + + content_type: str + identity: frozenset[tuple[str, Any]] + payload: str = "" + + +def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDelivery]: + """Extract logical deliveries from the yield channel's event list. + + The yield channel forwards events verbatim. A logical delivery is: + - A Full event (tool_request / tool_response): content delivered as-is. + - A Start + ... + Done sequence for text/reasoning/tool_request content. + + The `payload` field captures the content callers consume: + - text: initial_content.content (from Start) prepended to accumulated deltas + - reasoning: initial_content.summary joined (from Start) prepended to + accumulated reasoning-content deltas (this catches a channel that drops + the summary) + - tool_request: JSON-sorted arguments from the Start content (AGX1-377: now + delivered on both channels, no longer suppressed) + - tool_response: str(content) from Full event + """ + from agentex.types.text_content import TextContent + from agentex.types.reasoning_content import ReasoningContent + from agentex.types.tool_request_content import ToolRequestContent + + deliveries: list[LogicalDelivery] = [] + # Track which indices had a Start so we can pair with Done + started: dict[int, Any] = {} # index -> initial content + # Accumulate delta text per index (seed with initial_content text if present) + accumulated: dict[int, list[str]] = {} # index -> list of delta strings + + for event in events: + if isinstance(event, StreamTaskMessageStart): + if event.index is not None: + started[event.index] = event.content + # Seed accumulator with initial_content so a channel that drops + # initial_content but delivers deltas correctly will fail. + seed: list[str] = [] + if isinstance(event.content, TextContent) and event.content.content: + seed = [event.content.content] + elif isinstance(event.content, ReasoningContent) and event.content.summary: + seed = list(event.content.summary) + accumulated[event.index] = seed + elif isinstance(event, StreamTaskMessageDelta): + if event.index is not None and event.delta is not None: + if isinstance(event.delta, TextDelta) and event.delta.text_delta: + accumulated.setdefault(event.index, []).append(event.delta.text_delta) + elif isinstance(event.delta, ReasoningContentDelta) and event.delta.content_delta: + accumulated.setdefault(event.index, []).append(event.delta.content_delta) + elif isinstance(event, StreamTaskMessageDone): + if event.index is not None and event.index in started: + content = started.pop(event.index) + deltas = accumulated.pop(event.index, []) + ctype = getattr(content, "type", None) or "" + if ctype in ("text", "reasoning"): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset({("index", event.index)}), + payload="".join(deltas), + ) + ) + elif ctype == "tool_request" and isinstance(content, ToolRequestContent): + # AGX1-377 fix: auto_send now delivers streamed tool-request + # messages. Emit a delivery here so the cross-channel + # assertion verifies it is present on both channels. + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + payload=json.dumps(content.arguments, sort_keys=True), + ) + ) + elif isinstance(event, StreamTaskMessageFull): + content = event.content + ctype = getattr(content, "type", None) or "" + if ctype == "tool_response": + from agentex.types.tool_response_content import ToolResponseContent + + if isinstance(content, ToolResponseContent): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + payload=str(content.content), + ) + ) + elif ctype == "tool_request": + from agentex.types.tool_request_content import ToolRequestContent + + if isinstance(content, ToolRequestContent): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + payload=json.dumps(content.arguments, sort_keys=True), + ) + ) + + return deliveries + + +# --------------------------------------------------------------------------- +# Fake streaming backend for auto_send conformance runner +# --------------------------------------------------------------------------- + + +class _FakeCtx: + """Mirrors StreamingTaskMessageContext: __aenter__ opens, close() closes.""" + + def __init__(self, sink: list[Any], content_type: str, initial_content: Any) -> None: + self.sink = sink + self.content_type = content_type + self.task_message = TaskMessage( + id="msg-conformance", + task_id="conformance-task", + content=initial_content, + ) + + async def __aenter__(self) -> "_FakeCtx": + self.sink.append(("open", self.content_type, self.task_message.content)) + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update: Any) -> Any: + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + """Fake streaming backend; records every context lifecycle event.""" + + def __init__(self) -> None: + self.sink: list[Any] = [] + + def streaming_task_message_context( + self, + task_id: str, + initial_content: Any, + streaming_mode: str = "coalesced", + created_at: Any = None, + ) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + self.sink.append(("ctx", ctype, initial_content)) + return _FakeCtx(self.sink, ctype, initial_content) + + +class _FakeTracing: + """Minimal tracing backend: records started/ended span names + outputs.""" + + def __init__(self) -> None: + self.started: list[str] = [] + self.ended: list[Any] = [] + + async def start_span( + self, + *, + trace_id: str, + name: str, + input: Any = None, + parent_id: Any = None, + data: Any = None, + task_id: Any = None, + ) -> Any: + self.started.append(name) + return _types.SimpleNamespace() + + async def end_span(self, *, trace_id: str, span: Any) -> None: + self.ended.append(getattr(span, "output", None)) + + +class _RecordingTracer(SpanTracer): + """SpanTracer that records every SpanSignal it actually receives. + + Each delivery channel calls `tracer.handle(signal)` for every signal it + derives from the stream, so `received_signals` captures what the channel + genuinely emitted β€” not a re-derivation. Comparing the two channels' + recorded lists catches regressions where a channel skips + `deriver.observe(event)` for some event type. + """ + + def __init__(self, tracing: Any) -> None: + super().__init__( + trace_id="conformance-trace", + parent_span_id="conformance-parent", + tracing=tracing, + ) + self.received_signals: list[SpanSignal] = [] + + @override + async def handle(self, signal: SpanSignal) -> None: + self.received_signals.append(signal) + await super().handle(signal) + + +async def _gen(events: list[StreamTaskMessage]): # type: ignore[return] + for e in events: + yield e + + +def _auto_send_logical_deliveries(sink: list[Any]) -> list[LogicalDelivery]: + """Extract logical deliveries from the auto_send fake streaming sink. + + Each context lifecycle in the sink looks like: + ("ctx", ctype, content) -- context created + ("open", ctype, content) -- context __aenter__ + [("update", delta), ...] -- optional deltas (StreamTaskMessageDelta) + ("close", ctype) -- context closed + + A logical delivery corresponds to each open+close pair. For text/reasoning + we identify by sequential position and build the payload by prepending the + initial_content text (TextContent.content) or summary (ReasoningContent.summary) + to accumulated deltas. This matches _yield_logical_deliveries so a channel + that drops initial_content or reasoning summary fails the comparison. + For tool messages we use tool_call_id + name and capture arguments/content. + """ + from agentex.types.text_content import TextContent + from agentex.types.reasoning_content import ReasoningContent + from agentex.types.tool_request_content import ToolRequestContent + from agentex.types.tool_response_content import ToolResponseContent + + deliveries: list[LogicalDelivery] = [] + open_idx = 0 + while open_idx < len(sink): + entry = sink[open_idx] + if entry[0] == "ctx": + ctype: str = entry[1] + content: Any = entry[2] + found_open = False + delta_parts: list[str] = [] + # Seed delta_parts with initial_content so payload comparison + # catches a channel that drops initial_content but delivers deltas. + if isinstance(content, TextContent) and content.content: + delta_parts = [content.content] + elif isinstance(content, ReasoningContent) and content.summary: + delta_parts = list(content.summary) + for j in range(open_idx + 1, len(sink)): + if sink[j][0] == "open" and sink[j][1] == ctype and not found_open: + found_open = True + elif found_open and sink[j][0] == "update": + # Accumulate delta content from StreamTaskMessageDelta + update = sink[j][1] + if isinstance(update, StreamTaskMessageDelta) and update.delta is not None: + if isinstance(update.delta, TextDelta) and update.delta.text_delta: + delta_parts.append(update.delta.text_delta) + elif isinstance(update.delta, ReasoningContentDelta) and update.delta.content_delta: + delta_parts.append(update.delta.content_delta) + elif sink[j][0] == "close" and sink[j][1] == ctype and found_open: + # Matched open+close: emit logical delivery with payload + if ctype in ("text", "reasoning"): + count = sum(1 for k in range(open_idx) if sink[k][0] == "ctx" and sink[k][1] == ctype) + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset({("seq", count)}), + payload="".join(delta_parts), + ) + ) + elif ctype == "tool_response": + if isinstance(content, ToolResponseContent): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + payload=str(content.content), + ) + ) + elif ctype == "tool_request": + if isinstance(content, ToolRequestContent): + deliveries.append( + LogicalDelivery( + content_type=ctype, + identity=frozenset( + { + ("tool_call_id", content.tool_call_id), + ("name", content.name), + } + ), + payload=json.dumps(content.arguments, sort_keys=True), + ) + ) + open_idx = j + 1 + break + else: + open_idx += 1 + else: + open_idx += 1 + + return deliveries + + +def _yield_text_reasoning_seq(deliveries: list[LogicalDelivery]) -> list[LogicalDelivery]: + """Re-key text/reasoning deliveries from index-based to seq-based identity. + + The yield channel uses event.index as identity; auto_send uses a sequential + counter. To compare across channels, normalise both to sequential position + within each content type. + """ + result: list[LogicalDelivery] = [] + counts: dict[str, int] = {} + for d in deliveries: + if d.content_type in ("text", "reasoning"): + seq = counts.get(d.content_type, 0) + counts[d.content_type] = seq + 1 + result.append( + LogicalDelivery( + content_type=d.content_type, + identity=frozenset({("seq", seq)}), + payload=d.payload, + ) + ) + else: + result.append(d) + return result + + +async def run_cross_channel_conformance( + fixture: Fixture, +) -> tuple[list[LogicalDelivery], list[LogicalDelivery], list[SpanSignal], list[SpanSignal]]: + """Run both channels over a fixture; return (yield_deliveries, auto_deliveries, + yield_spans, auto_spans). + + The caller asserts yield_deliveries == auto_deliveries and + yield_spans == auto_spans. The span signals are the ones each channel's + tracer ACTUALLY recorded while delivering (not a re-derivation), so a + regression where a channel skips deriver.observe() for some event type is + caught. + """ + from agentex.lib.core.harness.auto_send import auto_send + from agentex.lib.core.harness.yield_delivery import yield_events + + # --- yield channel --- + tracer_yield = _RecordingTracer(tracing=_FakeTracing()) + yield_out = [e async for e in yield_events(_gen(fixture.events), tracer=tracer_yield)] + + # Span signals the yield channel actually emitted to its tracer + yield_spans = tracer_yield.received_signals + + # Logical deliveries from yield output + yield_deliveries = _yield_text_reasoning_seq(_yield_logical_deliveries(yield_out)) + + # --- auto_send channel --- + tracer_auto = _RecordingTracer(tracing=_FakeTracing()) + fake_streaming = _FakeStreaming() + await auto_send( + _gen(fixture.events), + task_id="conformance-task", + tracer=tracer_auto, + streaming=fake_streaming, + ) + + # Span signals the auto_send channel actually emitted to its tracer + auto_spans = tracer_auto.received_signals + + # Logical deliveries from what the streaming backend received + auto_deliveries = _auto_send_logical_deliveries(fake_streaming.sink) + + return yield_deliveries, auto_deliveries, yield_spans, auto_spans diff --git a/tests/lib/core/harness/conformance/test_conformance.py b/tests/lib/core/harness/conformance/test_conformance.py new file mode 100644 index 000000000..6d5f8ca66 --- /dev/null +++ b/tests/lib/core/harness/conformance/test_conformance.py @@ -0,0 +1,285 @@ +"""Cross-channel conformance tests: yield_events vs auto_send. + +What is asserted +---------------- +For each fixture the conformance runner drives BOTH delivery channels and +verifies two guarantees: + +1. **Logical-delivery equivalence**: the sequence of logically-delivered + messages is identical across channels. "Logical" normalises away the + streaming-envelope difference: + - yield channel delivers StreamTaskMessageFull(ToolResponseContent) as-is. + - auto_send delivers the same tool-response by opening a streaming context + with the full content and closing it immediately. + Both collapse to LogicalDelivery(content_type, identity, payload) tuples + that compare equal. The payload includes initial_content (TextContent.content + and ReasoningContent.summary) so a channel that drops initial content fails. + +2. **Span signal equivalence**: both channels feed the same pure SpanDeriver + over the same event sequence, so the derived span signals must be identical. + +What is NOT asserted +-------------------- +Raw wire-level event shapes are NOT compared (that would fail by design: the +Full vs Start+Done envelope difference is a documented, acceptable choice in +auto_send β€” see runner.py for the rationale). + +AGX1-377 fix: auto_send now delivers streamed tool-request messages. The +suppression that previously prevented the yield normaliser from emitting a +LogicalDelivery for Start(tool_request)+Done is removed. Both channels now +produce a delivery for streamed tool_request, verified by the +"streamed-tool-request" fixture. +""" + +from __future__ import annotations + +import pytest + +from agentex.types.text_delta import TextDelta +from agentex.types.text_content import TextContent +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta + +from .runner import ( + Fixture, + register, + derive_all, + all_fixtures, + run_cross_channel_conformance, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +_FIXTURES: list[Fixture] = [ + # fixture 1: single tool call β€” tool_request delivered via Full (classic path) + # plus a streamed tool_response via Full. Both channels should deliver both. + Fixture( + name="builtin-single-tool", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="c", name="Bash", arguments={} + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="c", name="Bash", content="ok" + ), + ), + ], + ), + # fixture 2: streaming text β€” exercises the text start/delta/done path. + # Uses non-empty initial_content so the payload comparison catches a channel + # that drops StreamTaskMessageStart.content (Greptile id 3438655533, P1). + Fixture( + name="streaming-text", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content="Init"), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="Hello"), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta=" world"), + ), + StreamTaskMessageDone(type="done", index=0), + ], + ), + # fixture 3: reasoning block β€” exercises reasoning span open/close + delivery. + # ReasoningContent.summary is included in the payload so a channel that drops + # the reasoning-summary fails (Greptile id 3438655533, P1). + Fixture( + name="reasoning-block", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ReasoningContent( + type="reasoning", + author="agent", + summary=["Thinking..."], + ), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=0, + content_delta="step 1", + ), + ), + StreamTaskMessageDone(type="done", index=0), + ], + ), + # fixture 4: streamed tool_request (AGX1-377 fix) β€” tool_request delivered + # via Start+Done (no Full). auto_send now delivers this instead of dropping + # it. Both channels must produce a LogicalDelivery for this fixture. + Fixture( + name="streamed-tool-request", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="tr-1", + name="Read", + arguments={"path": "/tmp/foo"}, + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="tr-1", + name="Read", + content="file contents", + ), + ), + ], + ), + # fixture 5: parallel tool calls + a tool that errors (AGX1-373 review, + # danielmillerp). The earlier fixtures only exercise one tool at a time, so + # equivalence is proven over trivially-orderable streams. This stresses the + # representative case: two tool spans open SIMULTANEOUSLY (p-ls opens via the + # streamed Start+Done path, p-read opens via Full while p-ls is still open), + # then close in a different order than they opened, and one of them returns + # an error. It guards against the two channels agreeing with each other while + # both mishandling interleaved/parallel spans or a failing tool. + # + # The failing tool sets ToolResponseContent.is_error=True (AGX1-371), which + # the span deriver threads onto the closed tool span's CloseSpan.is_error. + # Both channels feed the same deriver, so the recorded span signals β€” error + # status included β€” must match. + Fixture( + name="parallel-tools-with-error", + events=[ + # p-ls: streamed tool_request (opens its span at Done). + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="p-ls", + name="Bash", + arguments={"command": "ls /nope"}, + ), + ), + StreamTaskMessageDone(type="done", index=0), + # p-read: Full tool_request opens a second span while p-ls is open. + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="p-read", + name="Read", + arguments={"path": "/etc/hosts"}, + ), + ), + # p-ls errors and closes first (close order != open order). + StreamTaskMessageFull( + type="full", + index=2, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="p-ls", + name="Bash", + content="Error: ls: /nope: No such file or directory", + is_error=True, + ), + ), + # p-read succeeds and closes second. + StreamTaskMessageFull( + type="full", + index=3, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="p-read", + name="Read", + content="127.0.0.1 localhost", + ), + ), + ], + ), +] + +# Register all fixtures for backward-compatible use via all_fixtures() +for _f in _FIXTURES: + register(_f) + + +# --------------------------------------------------------------------------- +# Cross-channel conformance: logical equivalence + span equivalence +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("fixture", _FIXTURES, ids=lambda f: f.name) +@pytest.mark.asyncio +async def test_cross_channel_equivalence(fixture: Fixture) -> None: + """Assert that yield_events and auto_send produce equivalent logical + deliveries and identical span signals for every fixture. + + This is the real cross-channel guarantee: the two delivery adapters + agree on WHAT was delivered (logical content) and HOW spans were derived, + even though their streaming-envelope shapes differ (Full vs Start+Done for + tool messages). + + The span signals are the ones each channel's tracer ACTUALLY recorded while + delivering, not a re-derivation, so a regression where one channel skips + deriver.observe() for some event type is caught here. + """ + yield_deliveries, auto_deliveries, yield_spans, auto_spans = await run_cross_channel_conformance(fixture) + + assert yield_deliveries == auto_deliveries, ( + f"[{fixture.name}] logical deliveries differ:\n yield: {yield_deliveries}\n auto_send: {auto_deliveries}" + ) + assert yield_spans == auto_spans, ( + f"[{fixture.name}] span signals differ:\n yield: {yield_spans}\n auto_send: {auto_spans}" + ) + + +# --------------------------------------------------------------------------- +# Backward-compatible determinism test (kept for regression coverage) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("fixture", all_fixtures(), ids=lambda f: f.name) +def test_span_derivation_is_deterministic(fixture: Fixture) -> None: + """Span derivation over the same event list is idempotent. + + Retained as a lightweight regression guard. The primary cross-channel + guarantee is asserted in test_cross_channel_equivalence above. + """ + assert derive_all(fixture.events) == derive_all(fixture.events) diff --git a/tests/lib/core/harness/test_auto_send.py b/tests/lib/core/harness/test_auto_send.py new file mode 100644 index 000000000..1948e9196 --- /dev/null +++ b/tests/lib/core/harness/test_auto_send.py @@ -0,0 +1,490 @@ +"""Tests for auto_send delivery adapter. + +The fake mirrors the real StreamingTaskMessageContext API exactly: +- streaming_task_message_context(...) returns a context object (synchronously) +- open the context via __aenter__ (returns self after creating the task message) +- stream deltas via ctx.stream_update(StreamTaskMessageDelta(...)) +- close via ctx.close() (NOT __aexit__) + +This mirrors _langgraph_async.py lines 62-78 and 100-127. +""" + +import types as _types +from datetime import datetime + +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_delta import TextDelta +from agentex.types.tool_request_delta import ToolRequestDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.core.harness.auto_send import auto_send +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + + +class _FakeCtx: + """Mirrors StreamingTaskMessageContext: __aenter__ opens (returns self with task_message set), + close() closes. stream_update records the call. + + task_message is a real TaskMessage instance so that auto_send can use it + as parent_task_message in StreamTaskMessageDelta without Pydantic validation errors. + """ + + def __init__(self, sink, content_type, initial_content): + self.sink = sink + self.content_type = content_type + # Real TaskMessage so StreamTaskMessageDelta(parent_task_message=...) passes validation + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self): + self.sink.append(("open", self.content_type)) + return self + + async def __aexit__(self, *a): + # __aexit__ delegates to close in the real impl; keep for safety + await self.close() + return False + + async def close(self): + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + """Mirrors StreamingService: streaming_task_message_context returns a context object.""" + + def __init__(self): + self.sink = [] + self.recorded_created_at: list[datetime | None] = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): + ctype = getattr(initial_content, "type", None) + self.sink.append(("ctx", ctype)) + self.recorded_created_at.append(created_at) + return _FakeCtx(self.sink, ctype, initial_content) + + +async def _gen(events): + for e in events: + yield e + + +# --------------------------------------------------------------------------- +# Test 1: text streaming β€” open, stream deltas, close; return accumulated text +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_streams_text_and_returns_final_text(): + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="Hel"), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="lo"), + ), + StreamTaskMessageDone(type="done", index=0), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + + assert result.final_text == "Hello" + + kinds = [s[0] for s in streaming.sink] + # A context was created for the text content + assert kinds[0] == "ctx" + # It was opened and closed + assert "open" in kinds + assert "close" in kinds + # Exactly two updates were streamed (one per delta) + updates = [s for s in streaming.sink if s[0] == "update"] + assert len(updates) == 2 + + +# --------------------------------------------------------------------------- +# Test 2: tool_request Full + tool_response Full β€” each posts one full message +# (open context with the content, no deltas, close immediately) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_posts_full_tool_messages(): + streaming = _FakeStreaming() + events = [ + # Two Full events post two messages (open+close immediately, no deltas). + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c1", + name="Bash", + arguments={"cmd": "ls"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="c1", + name="Bash", + content="file.py", + ), + ), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + + assert result.final_text == "" + + # Each Full event opens and closes exactly one context. + ctx_events = [s for s in streaming.sink if s[0] == "ctx"] + assert len(ctx_events) == 2 + content_types = [s[1] for s in ctx_events] + assert content_types == ["tool_request", "tool_response"] + + # Each context is opened and closed + opens = [s for s in streaming.sink if s[0] == "open"] + closes = [s for s in streaming.sink if s[0] == "close"] + assert len(opens) == 2 + assert len(closes) == 2 + + # No stream_update calls (full messages have no deltas) + updates = [s for s in streaming.sink if s[0] == "update"] + assert len(updates) == 0 + + +# --------------------------------------------------------------------------- +# Test 3: tracing β€” spans are derived and handed to the tracer +# --------------------------------------------------------------------------- + + +class _RecordTracing: + def __init__(self): + self.started, self.ended = [], [] + + async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): + self.started.append(name) + return _types.SimpleNamespace() + + async def end_span(self, *, trace_id, span): + self.ended.append(getattr(span, "output", None)) + + +@pytest.mark.asyncio +async def test_auto_send_derives_tool_spans_via_tracer(): + fake_tracing = _RecordTracing() + tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake_tracing) + streaming = _FakeStreaming() + + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c1", + name="Bash", + arguments={}, + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="c1", + name="Bash", + content="ok", + ), + ), + ] + + result = await auto_send(_gen(events), task_id="task1", tracer=tracer, streaming=streaming) + + assert result.final_text == "" + assert fake_tracing.started == ["Bash"] + assert fake_tracing.ended == ["ok"] + + +# --------------------------------------------------------------------------- +# Test 4: text followed by a tool Full β€” text context is closed before Full +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_closes_text_context_before_full_message(): + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="Hi"), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c2", + name="read_file", + arguments={}, + ), + ), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + assert result.final_text == "Hi" + + # Verify ordering: text ctx opens, updates, closes; then tool_request ctx opens, closes + event_sequence = [(s[0], s[1]) for s in streaming.sink] + text_open_idx = next(i for i, s in enumerate(event_sequence) if s == ("open", "text")) + text_close_idx = next(i for i, s in enumerate(event_sequence) if s == ("close", "text")) + tool_open_idx = next(i for i, s in enumerate(event_sequence) if s == ("open", "tool_request")) + assert text_open_idx < text_close_idx < tool_open_idx + + +# --------------------------------------------------------------------------- +# Test 5: midstream error β€” propagates AND the open context is closed (finally) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_open_context_closed_on_midstream_error(): + streaming = _FakeStreaming() + + async def _exploding_gen(): + yield StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ) + raise RuntimeError("boom") + + with pytest.raises(RuntimeError, match="boom"): + await auto_send(_exploding_gen(), task_id="task1", tracer=None, streaming=streaming) + + # The text context that was opened mid-stream was closed by the finally block. + assert ("open", "text") in [(s[0], s[1]) for s in streaming.sink] + assert ("close", "text") in [(s[0], s[1]) for s in streaming.sink] + + +# --------------------------------------------------------------------------- +# Test 6: streamed tool_request delivered (AGX1-377 core) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_streams_tool_request(): + """A Start(ToolRequestContent) MUST open a streaming context (AGX1-377).""" + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c_tool", + name="Bash", + arguments={}, + ), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ToolRequestDelta( + type="tool_request", + tool_call_id="c_tool", + name="Bash", + arguments_delta='{"cmd": "ls"}', + ), + ), + StreamTaskMessageDone(type="done", index=0), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + + assert result.final_text == "" + + ctx_events = [s for s in streaming.sink if s[0] == "ctx"] + assert len(ctx_events) == 1 + assert ctx_events[0][1] == "tool_request" + + opens = [s for s in streaming.sink if s[0] == "open"] + closes = [s for s in streaming.sink if s[0] == "close"] + assert len(opens) == 1 + assert len(closes) == 1 + + updates = [s for s in streaming.sink if s[0] == "update"] + assert len(updates) == 1 + + +# --------------------------------------------------------------------------- +# Test 7: interleaved indexes route correctly +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_interleaved_indexes_route_correctly(): + """Deltas must be routed to the correct index-keyed context.""" + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageStart( + type="start", + index=1, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="A"), + ), + StreamTaskMessageDelta( + type="delta", + index=1, + delta=TextDelta(type="text", text_delta="B"), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageDone(type="done", index=1), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + + ctx_events = [s for s in streaming.sink if s[0] == "ctx"] + assert len(ctx_events) == 2 + + opens = [s for s in streaming.sink if s[0] == "open"] + assert len(opens) == 2 + + updates = [s for s in streaming.sink if s[0] == "update"] + assert len(updates) == 2 + + update_deltas = [s[1].delta for s in streaming.sink if s[0] == "update"] + text_deltas = [d.text_delta for d in update_deltas if isinstance(d, TextDelta)] + assert set(text_deltas) == {"A", "B"} + + +# --------------------------------------------------------------------------- +# Test 8: final_text returns last text segment for multi-step +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_final_text_last_segment(): + """final_text must be the LAST text segment, not accumulated across all turns.""" + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="First"), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageStart( + type="start", + index=1, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=1, + delta=TextDelta(type="text", text_delta="Second"), + ), + StreamTaskMessageDone(type="done", index=1), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + assert result.final_text == "Second" + + +# --------------------------------------------------------------------------- +# Test 9: Full(TextContent) contributes to final_text +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_full_text_content_sets_final_text(): + """A Full(TextContent) must contribute its text to final_text.""" + streaming = _FakeStreaming() + events = [ + StreamTaskMessageFull( + type="full", + index=0, + content=TextContent(type="text", author="agent", content="hello"), + ), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + assert result.final_text == "hello" + + +# --------------------------------------------------------------------------- +# Test 10: created_at is forwarded to streaming context (AGX1-378) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_created_at_forwarded(): + """created_at must be forwarded to every streaming_task_message_context call.""" + streaming = _FakeStreaming() + dt = datetime(2025, 1, 15, 12, 0, 0) + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c_ts", + name="Bash", + arguments={}, + ), + ), + ] + await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming, created_at=dt) + + assert all(ts == dt for ts in streaming.recorded_created_at) diff --git a/tests/lib/core/harness/test_emitter.py b/tests/lib/core/harness/test_emitter.py new file mode 100644 index 000000000..df155ec44 --- /dev/null +++ b/tests/lib/core/harness/test_emitter.py @@ -0,0 +1,148 @@ +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import TurnUsage +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) + + +class _FakeTracing: + async def start_span(self, **kw): + return None + + async def end_span(self, **kw): + pass + + +class _FakeCtx: + """Minimal StreamingTaskMessageContext fake (see test_auto_send.py).""" + + def __init__(self, sink, content_type, initial_content): + self.sink = sink + self.content_type = content_type + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self): + self.sink.append(("open", self.content_type)) + return self + + async def __aexit__(self, *a): + await self.close() + return False + + async def close(self): + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + def __init__(self): + self.sink = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): + ctype = getattr(initial_content, "type", None) + self.sink.append(("ctx", ctype)) + return _FakeCtx(self.sink, ctype, initial_content) + + +class _Turn: + def __init__(self, events_list, usage): + self._events_list = events_list + self._usage = usage + + @property + async def events(self): + for e in self._events_list: + yield e + + def usage(self): + return self._usage + + +@pytest.mark.asyncio +async def test_emitter_yield_mode_passes_through(): + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="hi")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = _Turn(events, TurnUsage(model="m")) + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(turn)] + assert out == events + + +@pytest.mark.asyncio +async def test_emitter_tracing_default_on_when_trace_id_present(): + # Inject a fake tracing backend so the test env doesn't need temporalio. + # This exercises the default-on path (tracer=None) when trace_id is truthy. + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id="p", tracing=_FakeTracing()) + assert emitter.tracer is not None + + +@pytest.mark.asyncio +async def test_emitter_tracing_overridable_off(): + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id="p", tracer=False) + assert emitter.tracer is None + + +@pytest.mark.asyncio +async def test_emitter_auto_send_turn_returns_usage(): + usage = TurnUsage(model="m", input_tokens=5) + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="Hello")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = _Turn(events, usage) + fake = _FakeStreaming() + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None, streaming=fake) + result = await emitter.auto_send_turn(turn) + assert result.usage == usage + assert result.final_text == "Hello" + + +class _ContractTurn: + """A turn that honors the single-pass contract: usage() is the empty default + UNTIL `events` is exhausted, then the real usage (this is how real harness + turns behave β€” they populate usage while the stream is consumed).""" + + def __init__(self, events_list, real_usage): + self._events_list = events_list + self._real_usage = real_usage + self._exhausted = False + + @property + async def events(self): + for e in self._events_list: + yield e + self._exhausted = True + + def usage(self): + return self._real_usage if self._exhausted else TurnUsage(model="m") + + +@pytest.mark.asyncio +async def test_emitter_auto_send_turn_reads_usage_after_exhaustion(): + # Regression: auto_send_turn must read turn.usage() AFTER consuming the + # stream, not eagerly when building the auto_send call (which would capture + # the empty default and lose real token usage on the auto_send path). + real_usage = TurnUsage(model="m", input_tokens=11, output_tokens=22, total_tokens=33, num_llm_calls=2) + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="hi")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = _ContractTurn(events, real_usage) + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None, streaming=_FakeStreaming()) + result = await emitter.auto_send_turn(turn) + assert result.usage == real_usage + assert result.usage.input_tokens == 11 and result.usage.total_tokens == 33 diff --git a/tests/lib/core/harness/test_span_derivation.py b/tests/lib/core/harness/test_span_derivation.py new file mode 100644 index 000000000..51e2ede2c --- /dev/null +++ b/tests/lib/core/harness/test_span_derivation.py @@ -0,0 +1,286 @@ +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import OpenSpan, CloseSpan +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.tool_request_delta import ToolRequestDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.core.harness.span_derivation import SpanDeriver + + +def _signals(deriver, events): + out = [] + for e in events: + out.extend(deriver.observe(e)) + out.extend(deriver.flush()) + return out + + +def _tool_req(idx, tcid, name, args): + return StreamTaskMessageStart( + type="start", + index=idx, + content=ToolRequestContent(type="tool_request", author="agent", tool_call_id=tcid, name=name, arguments=args), + ) + + +def test_text_only_yields_no_spans(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=None), + StreamTaskMessageDone(type="done", index=0), + ] + assert _signals(d, events) == [] + + +def test_single_tool_opens_on_done_closes_on_response(): + d = SpanDeriver() + events = [ + _tool_req(0, "call_1", "Bash", {"cmd": "ls"}), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="call_1", name="Bash", content="files" + ), + ), + ] + sigs = _signals(d, events) + assert sigs == [ + OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"}), + CloseSpan(key="call_1", output="files", is_complete=True), + ] + # No status reported -> CloseSpan carries is_error=None. + assert sigs[1].is_error is None + + +def test_tool_response_is_error_propagates_to_close_span(): + """ToolResponseContent.is_error flows onto the CloseSpan so a derived tool + span can be marked as a failure (AGX1-371).""" + d = SpanDeriver() + events = [ + _tool_req(0, "call_err", "Bash", {"cmd": "false"}), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_err", + name="Bash", + content="boom", + is_error=True, + ), + ), + ] + sigs = _signals(d, events) + assert sigs == [ + OpenSpan(key="call_err", kind="tool", name="Bash", input={"cmd": "false"}), + CloseSpan(key="call_err", output="boom", is_complete=True, is_error=True), + ] + + +def test_reasoning_opens_on_start_closes_on_done(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", index=0, content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[]) + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={}) + assert sigs[1] == CloseSpan(key="reasoning:0", output=None, is_complete=True) + + +def test_parallel_tools_pair_by_tool_call_id(): + d = SpanDeriver() + events = [ + _tool_req(0, "a", "T1", {}), + _tool_req(1, "b", "T2", {}), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageDone(type="done", index=1), + StreamTaskMessageFull( + type="full", + index=2, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="b", name="T2", content="rb" + ), + ), + StreamTaskMessageFull( + type="full", + index=3, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="a", name="T1", content="ra" + ), + ), + ] + sigs = _signals(d, events) + opens = [s for s in sigs if isinstance(s, OpenSpan)] + closes = [s for s in sigs if isinstance(s, CloseSpan)] + assert {o.key for o in opens} == {"a", "b"} + assert [c.key for c in closes] == ["b", "a"] + assert all(c.is_complete for c in closes) + + +def test_streamed_args_accumulate_into_open_input(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="c", name="Bash", arguments={} + ), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ToolRequestDelta(type="tool_request", tool_call_id="c", name="Bash", arguments_delta='{"cmd":'), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ToolRequestDelta(type="tool_request", tool_call_id="c", name="Bash", arguments_delta='"ls"}'), + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="c", kind="tool", name="Bash", input={"cmd": "ls"}) + + +def test_unclosed_tool_closed_incomplete_on_flush(): + d = SpanDeriver() + events = [ + _tool_req(0, "x", "Bash", {}), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="x", kind="tool", name="Bash", input={}) + assert sigs[1] == CloseSpan(key="x", output=None, is_complete=False) + + +def test_none_index_is_skipped(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", + index=None, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="n", name="Bash", arguments={} + ), + ), + StreamTaskMessageDone(type="done", index=None), + ] + assert _signals(d, events) == [] + + +def test_orphan_tool_response_ignored(): + d = SpanDeriver() + events = [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="z", name="Bash", content="r" + ), + ), + ] + assert _signals(d, events) == [] + + +def test_full_tool_request_opens_span(): + """Full(ToolRequestContent) must open a tool span (for LangGraph-style harnesses).""" + d = SpanDeriver() + events = [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_x", + name="Bash", + arguments={"cmd": "ls"}, + ), + ), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="call_x", kind="tool", name="Bash", input={"cmd": "ls"}) + assert sigs[1] == CloseSpan(key="call_x", output=None, is_complete=False) + + +def test_full_tool_request_and_response_paired(): + """Full(ToolRequestContent) + Full(ToolResponseContent) produces a complete span pair.""" + d = SpanDeriver() + events = [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_y", + name="Grep", + arguments={}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_y", + name="Grep", + content="result", + ), + ), + ] + sigs = _signals(d, events) + assert sigs == [ + OpenSpan(key="call_y", kind="tool", name="Grep", input={}), + CloseSpan(key="call_y", output="result", is_complete=True), + ] + + +def test_full_tool_request_does_not_double_open(): + """A Full(ToolRequestContent) for an already-open tool_call_id is a no-op.""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_z", + name="X", + arguments={}, + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_z", + name="X", + arguments={}, + ), + ), + ] + sigs = _signals(d, events) + opens = [s for s in sigs if isinstance(s, OpenSpan)] + assert len(opens) == 1 + assert opens[0].key == "call_z" diff --git a/tests/lib/core/harness/test_tracer.py b/tests/lib/core/harness/test_tracer.py new file mode 100644 index 000000000..ed40cf595 --- /dev/null +++ b/tests/lib/core/harness/test_tracer.py @@ -0,0 +1,93 @@ +from typing import override + +import pytest + +from agentex.lib.core.harness.types import OpenSpan, CloseSpan +from agentex.lib.core.harness.tracer import SpanTracer + + +class _FakeSpan: + def __init__(self, name): + self.name = name + self.output = None + self.data = None + + +class _FakeTracing: + def __init__(self): + self.started = [] + self.ended = [] + self.ended_spans = [] + + async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): + self.started.append((name, parent_id, input)) + return _FakeSpan(name) + + async def end_span(self, *, trace_id, span): + self.ended.append((span.name, span.output)) + self.ended_spans.append(span) + + +@pytest.mark.asyncio +async def test_open_then_close_starts_and_ends_span(): + fake = _FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"})) + await tracer.handle(CloseSpan(key="call_1", output="files", is_complete=True)) + assert fake.started == [("Bash", "p1", {"cmd": "ls"})] + assert fake.ended == [("Bash", "files")] + + +@pytest.mark.asyncio +async def test_close_records_is_error_on_span_data(): + """A CloseSpan carrying is_error records the status on span.data (AGX1-371).""" + fake = _FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="call_err", kind="tool", name="Bash", input={})) + await tracer.handle(CloseSpan(key="call_err", output="boom", is_complete=True, is_error=True)) + assert fake.ended_spans[0].data == {"is_error": True} + + +@pytest.mark.asyncio +async def test_close_without_status_leaves_span_data_untouched(): + """is_error=None (no status reported) must not write to span.data.""" + fake = _FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="call_1", kind="tool", name="Bash", input={})) + await tracer.handle(CloseSpan(key="call_1", output="files", is_complete=True)) + assert fake.ended_spans[0].data is None + + +@pytest.mark.asyncio +async def test_no_trace_id_is_noop(): + fake = _FakeTracing() + tracer = SpanTracer(trace_id="", parent_span_id=None, tracing=fake) + await tracer.handle(OpenSpan(key="k", kind="tool", name="X")) + await tracer.handle(CloseSpan(key="k")) + assert fake.started == [] and fake.ended == [] + + +@pytest.mark.asyncio +async def test_tracing_failure_is_swallowed(): + class _Boom(_FakeTracing): + @override + async def start_span(self, **kw): + raise RuntimeError("backend down") + + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=_Boom()) + # Must not raise. + await tracer.handle(OpenSpan(key="k", kind="tool", name="X")) + await tracer.handle(CloseSpan(key="k")) + assert tracer._open == {} + + +@pytest.mark.asyncio +async def test_duplicate_open_replaces_silently(): + fake = _FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="k", kind="tool", name="A")) + await tracer.handle(OpenSpan(key="k", kind="tool", name="B")) + await tracer.handle(CloseSpan(key="k")) + # Both opens started spans, but only the second ("B") is closed. + assert [name for name, _, _ in fake.started] == ["A", "B"] + assert fake.ended == [("B", None)] diff --git a/tests/lib/core/harness/test_types.py b/tests/lib/core/harness/test_types.py new file mode 100644 index 000000000..68bc89ce2 --- /dev/null +++ b/tests/lib/core/harness/test_types.py @@ -0,0 +1,53 @@ +from typing import AsyncIterator + +from agentex.lib.core.harness.types import ( + OpenSpan, + CloseSpan, + TurnUsage, + TurnResult, + HarnessTurn, + StreamTaskMessage, +) + + +def test_open_close_span_construct(): + o = OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"}) + c = CloseSpan(key="call_1", output="files", is_complete=True) + assert o.key == c.key == "call_1" + assert o.kind == "tool" + assert c.is_complete is True + + +def test_turn_usage_defaults_are_none(): + u = TurnUsage(model="claude-opus-4-6") + assert u.model == "claude-opus-4-6" + assert u.input_tokens is None + assert u.num_tool_calls == 0 + + +def test_turn_result_wraps_usage(): + r = TurnResult(final_text="hi", usage=TurnUsage(model="m")) + assert r.final_text == "hi" + assert r.usage.model == "m" + + +def test_close_span_defaults(): + c = CloseSpan(key="x") + assert c.output is None + assert c.is_complete is True + + +def test_harness_turn_runtime_check(): + class _Turn: + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: + async def _gen() -> AsyncIterator[StreamTaskMessage]: + if False: + yield # pragma: no cover + + return _gen() + + def usage(self) -> TurnUsage: + return TurnUsage(model="m") + + assert isinstance(_Turn(), HarnessTurn) is True diff --git a/tests/lib/core/harness/test_yield_delivery.py b/tests/lib/core/harness/test_yield_delivery.py new file mode 100644 index 000000000..f3f491d84 --- /dev/null +++ b/tests/lib/core/harness/test_yield_delivery.py @@ -0,0 +1,89 @@ +import types as _types + +import pytest + +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.core.harness.yield_delivery import yield_events + + +class _RecordTracing: + def __init__(self): + self.started, self.ended = [], [] + + async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): + self.started.append(name) + return _types.SimpleNamespace() # supports arbitrary attribute assignment (span.output = ...) + + async def end_span(self, *, trace_id, span): + self.ended.append(getattr(span, "output", None)) + + +async def _gen(events): + for e in events: + yield e + + +@pytest.mark.asyncio +async def test_yield_passes_events_through_and_traces(): + fake = _RecordTracing() + tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake) + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="c", name="Bash", arguments={} + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="c", name="Bash", content="ok" + ), + ), + ] + out = [e async for e in yield_events(_gen(events), tracer=tracer)] + assert out == events # passthrough unchanged + assert fake.started == ["Bash"] # span derived + opened + assert fake.ended == ["ok"] # span closed with response + + +@pytest.mark.asyncio +async def test_yield_without_tracer_is_pure_passthrough(): + events = [ + StreamTaskMessageDone(type="done", index=0), + ] + out = [e async for e in yield_events(_gen(events), tracer=None)] + assert out == events + + +@pytest.mark.asyncio +async def test_flush_runs_on_early_close(): + fake = _RecordTracing() + tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake) + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="c", name="Bash", arguments={} + ), + ), + StreamTaskMessageDone(type="done", index=0), + # response intentionally never arrives + ] + gen = yield_events(_gen(events), tracer=tracer) + first = await gen.__anext__() # Start + second = await gen.__anext__() # Done -> tool span opens here + await gen.aclose() # triggers the finally -> flush() + assert fake.started == ["Bash"] + assert fake.ended == [None] # flush closed the unpaired span (incomplete, no output)