From a84f83b7aa6d80673709f07e77bb5c9a122f0430 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 09:48:16 -0400 Subject: [PATCH 1/2] test(harness): shared FakeSpan/FakeTracing doubles + conformance determinism fix Extract tests/lib/core/harness/_fakes.py (FakeSpan/FakeTracing), removing ~9 duplicated copies, and harden the conformance determinism test: it now iterates all_fixtures() at runtime (paired with a conftest that eagerly imports every per-harness conformance module) instead of import-time parametrization, which had silently dropped per-harness coverage (Greptile P1). Also removes the now-landed pr4-pydantic-ai planning doc. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...unified-harness-surface-pr4-pydantic-ai.md | 246 ------------------ tests/lib/core/harness/_fakes.py | 63 +++++ .../lib/core/harness/conformance/conftest.py | 21 ++ tests/lib/core/harness/conformance/runner.py | 54 +--- .../conformance/test_codex_conformance.py | 13 +- .../harness/conformance/test_conformance.py | 38 ++- .../conformance/test_langgraph_conformance.py | 13 +- .../test_pydantic_ai_conformance.py | 24 +- tests/lib/core/harness/test_auto_send.py | 27 +- tests/lib/core/harness/test_emitter.py | 10 +- tests/lib/core/harness/test_tracer.py | 34 +-- tests/lib/core/harness/test_yield_delivery.py | 26 +- 12 files changed, 156 insertions(+), 413 deletions(-) delete mode 100644 docs/superpowers/plans/2026-06-18-unified-harness-surface-pr4-pydantic-ai.md create mode 100644 tests/lib/core/harness/_fakes.py create mode 100644 tests/lib/core/harness/conformance/conftest.py diff --git a/docs/superpowers/plans/2026-06-18-unified-harness-surface-pr4-pydantic-ai.md b/docs/superpowers/plans/2026-06-18-unified-harness-surface-pr4-pydantic-ai.md deleted file mode 100644 index 2fa1892fe..000000000 --- a/docs/superpowers/plans/2026-06-18-unified-harness-surface-pr4-pydantic-ai.md +++ /dev/null @@ -1,246 +0,0 @@ -# Unified Harness Surface — PR 4: pydantic-ai Migration Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Migrate the pydantic-ai harness onto the unified harness surface so it emits streaming + persisted messages + tracing + turn usage through ONE source of truth, over both delivery channels (yield + auto-send), with no public regression — and ship its 3 integration test agents (sync/async/temporal). - -**Architecture:** Wrap a pydantic-ai run as a `HarnessTurn` (canonical `StreamTaskMessage*` stream + normalized `TurnUsage`). Reuse the existing `convert_pydantic_ai_to_agentex_events` mapping as the tap. Reimplement the existing public auto-send helper on top of `UnifiedEmitter.auto_send_turn`, and route sync ACP agents through `UnifiedEmitter.yield_turn`. Retire the bespoke `_pydantic_ai_tracing` handler in favor of the surface's derived spans (keep the old symbol as a deprecated shim). - -**Tech Stack:** Python 3, pydantic-ai (`pydantic_ai`), pydantic v2, pytest + pytest-asyncio, the `agentex.lib.core.harness` package from PRs 1-3. - -**Foundation:** `src/agentex/lib/core/harness/` (`UnifiedEmitter`, `SpanTracer`, `SpanDeriver`, `HarnessTurn`, `TurnUsage`, `TurnResult`, `yield_events`, `auto_send`, conformance scaffold). Design: `docs/superpowers/specs/2026-06-18-unified-harness-surface-design.md`. - ---- - -## Dependencies (must land first) - -- **AGX1-373** — cross-channel conformance equivalence + `Full` wire reconciliation. PR 4's conformance fixtures register into the upgraded cross-channel runner. **Do not start Task 6 until 373 is merged into the foundation branch.** -- **AGX1-375** — public `adk` import path for the harness surface. If merged, import the surface via the public path in this PR; if not, import from `agentex.lib.core.harness` and add a follow-up note. (Tasks below assume `from agentex.lib.core.harness import UnifiedEmitter, TurnUsage, ...`; swap to the public path if 375 landed.) - -This is one PR (target < 1000 lines code, excluding any recorded fixtures). The 3 test agents are the largest chunk; if the diff exceeds budget, split the test agents into a follow-up PR 4b (note in the PR description). - ---- - -## File Structure - -- Modify `src/agentex/lib/adk/_modules/_pydantic_ai_sync.py` — add an optional `on_result` callback to `convert_pydantic_ai_to_agentex_events` (additive) so usage can be captured. Behavior unchanged when omitted. -- Create `src/agentex/lib/adk/_modules/_pydantic_ai_turn.py` — `PydanticAITurn(HarnessTurn)` + `pydantic_ai_usage_to_turn_usage(...)`. -- Modify `src/agentex/lib/adk/_modules/_pydantic_ai_async.py` — reimplement `stream_pydantic_ai_events` on `UnifiedEmitter.auto_send_turn`, preserving signature + return. -- Modify `src/agentex/lib/adk/_modules/_pydantic_ai_tracing.py` — mark `create_pydantic_ai_tracing_handler` / `AgentexPydanticAITracingHandler` deprecated (docstring + `DeprecationWarning`); keep importable. -- Create `tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py` — register pydantic-ai fixtures into the cross-channel conformance runner. -- Create `examples/tutorials/harness-pydantic-ai-{sync,async,temporal}/` — 3 test agents (modeled on the `sync-pydantic-ai` / `default-pydantic-ai` / `temporal-pydantic-ai` CLI templates) using the unified surface. -- Modify `.github/workflows/harness-integration.yml` — enable the pydantic-ai rows of the `live-matrix` job. -- Modify `.github/workflows/agentex-tutorials-test.yml` (or its agent list) — include the 3 new test agents if that workflow enumerates agents. - ---- - -## Task 1: Expose the pydantic-ai run result for usage capture - -**Files:** -- Modify: `src/agentex/lib/adk/_modules/_pydantic_ai_sync.py` -- Test: `tests/lib/adk/test_pydantic_ai_sync.py` (create if absent) - -The converter already iterates the pydantic-ai event stream and currently *ignores* `AgentRunResultEvent` (the terminal event carrying the run result + usage). Add an optional callback so a caller can capture it without changing existing behavior. - -- [ ] **Step 1: Write the failing test.** - -```python -import pytest -from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events - - -class _FakeResultEvent: # stand-in for pydantic_ai.run.AgentRunResultEvent - def __init__(self, result): - self.result = result - - -async def _stream(events): - for e in events: - yield e - - -@pytest.mark.asyncio -async def test_on_result_callback_receives_terminal_event(monkeypatch): - # When the stream ends with an AgentRunResultEvent, on_result is invoked with it, - # and the converter still yields no extra events for it. - captured = {} - # Use a real AgentRunResultEvent if constructable; otherwise patch isinstance check. - # (Implementer: see Step 3 note — match the real terminal event type.) - ... -``` - -Implementer note: the exact terminal event type is `pydantic_ai.run.AgentRunResultEvent` (already imported in `_pydantic_ai_sync.py`). Write the test to feed a stream ending in a real `AgentRunResultEvent` (construct it as the installed pydantic-ai version requires; inspect `python -c "import pydantic_ai.run, inspect; print(inspect.signature(pydantic_ai.run.AgentRunResultEvent))"`). Assert `on_result` is called once with that event and that the converter yields the same `StreamTaskMessage*` sequence as without the callback (no behavior change for the streaming output). - -- [ ] **Step 2: Run** `uv run pytest tests/lib/adk/test_pydantic_ai_sync.py -v` — expect FAIL (no `on_result` param). - -- [ ] **Step 3: Implement.** Add `on_result: Callable[[AgentRunResultEvent], None] | None = None` (and an async-callable variant if needed) to `convert_pydantic_ai_to_agentex_events`. In the existing `elif isinstance(event, (FunctionToolCallEvent, FinalResultEvent, AgentRunResultEvent))` branch, when the event is an `AgentRunResultEvent` and `on_result` is set, call it (await if it's a coroutine). Keep yielding nothing for it. No other change. - -- [ ] **Step 4: Run** the test — expect PASS, plus run the existing `_pydantic_ai_sync` tests if any to confirm no regression. - -- [ ] **Step 5: Commit** `feat(pydantic-ai): optional on_result callback to expose run result for usage capture`. - ---- - -## Task 2: Normalize pydantic-ai usage to `TurnUsage` - -**Files:** -- Create: `src/agentex/lib/adk/_modules/_pydantic_ai_turn.py` -- Test: `tests/lib/adk/test_pydantic_ai_turn.py` - -- [ ] **Step 1: Verify the real usage shape FIRST.** Run `uv run python -c "from pydantic_ai.usage import RunUsage; import inspect; print([f for f in RunUsage.model_fields])"` (the type/name may be `RunUsage` or `Usage` depending on the installed version). Record the exact field names (commonly: `input_tokens`, `output_tokens`, `total_tokens`, `requests`, and a cache/`details` field). The mapping in Step 3 MUST use the real field names. - -- [ ] **Step 2: Write the failing test.** - -```python -from agentex.lib.adk._modules._pydantic_ai_turn import pydantic_ai_usage_to_turn_usage - - -def test_usage_normalization_maps_fields(): - # Build a usage object matching the installed pydantic-ai RunUsage shape - # (see Task 2 Step 1 for the real fields), then assert the mapping. - usage_obj = ... # construct RunUsage(input_tokens=10, output_tokens=20, requests=2, ...) - tu = pydantic_ai_usage_to_turn_usage(usage_obj, model="openai:gpt-4o") - assert tu.model == "openai:gpt-4o" - assert tu.input_tokens == 10 - assert tu.output_tokens == 20 - assert tu.num_llm_calls == 2 -``` - -- [ ] **Step 3: Implement** `pydantic_ai_usage_to_turn_usage(usage, model) -> TurnUsage` mapping the verified RunUsage fields onto `TurnUsage` (`input_tokens`, `output_tokens`, `total_tokens`, `cached_input_tokens` if available, `num_llm_calls` ← `requests`). Use `getattr(usage, "", None)` defensively so a version field rename degrades to `None` rather than crashing. Then implement `PydanticAITurn`: - -```python -class PydanticAITurn: - """A pydantic-ai run as a HarnessTurn: canonical event stream + normalized usage.""" - - def __init__(self, stream, model: str | None = None): - self._stream = stream - self._model = model - self._usage = TurnUsage(model=model) - - @property - async def events(self): - def _capture(result_event): - run_result = getattr(result_event, "result", None) - usage_obj = run_result.usage() if run_result is not None else None - if usage_obj is not None: - self._usage = pydantic_ai_usage_to_turn_usage(usage_obj, self._model) - async for ev in convert_pydantic_ai_to_agentex_events(self._stream, on_result=_capture): - yield ev - - def usage(self) -> TurnUsage: - return self._usage -``` - -(Verify `run_result.usage()` is the correct accessor for the installed version; adjust if it's an attribute.) - -- [ ] **Step 4: Add a `PydanticAITurn` test** that feeds a small stream ending in an `AgentRunResultEvent` whose `result.usage()` returns a known usage, drives `turn.events` to exhaustion, then asserts `turn.usage()` reflects the normalized values and that `events` yielded the expected `StreamTaskMessage*`. Confirm `usage()` BEFORE exhaustion returns the default (documented single-pass contract). - -- [ ] **Step 5: Run** the tests — expect PASS. - -- [ ] **Step 6: Commit** `feat(pydantic-ai): PydanticAITurn HarnessTurn + usage normalization`. - ---- - -## Task 3: Reimplement the auto-send helper on the unified surface - -**Files:** -- Modify: `src/agentex/lib/adk/_modules/_pydantic_ai_async.py` -- Test: `tests/lib/adk/test_pydantic_ai_async.py` - -`stream_pydantic_ai_events(stream, task_id, ...)` currently hand-drives `adk.streaming`. Reimplement it to delegate to `UnifiedEmitter.auto_send_turn(PydanticAITurn(stream, model))`, preserving its signature and return value (the accumulated final text). Feature-add: traces by default. - -- [ ] **Step 1: Capture current behavior as a characterization test.** Before changing anything, write a test that runs the CURRENT `stream_pydantic_ai_events` over a fixture stream with a fake `adk.streaming` and records the messages produced (text, tool request/response). This is the backward-compat baseline ("equivalent messages before/after" from the design). - -- [ ] **Step 2: Run** it green against the current implementation. Commit the test alone: `test(pydantic-ai): characterize stream_pydantic_ai_events output`. - -- [ ] **Step 3: Reimplement** `stream_pydantic_ai_events` to build a `PydanticAITurn` and call `UnifiedEmitter(task_id=task_id, trace_id=, parent_span_id=, streaming=).auto_send_turn(turn)`, returning `result.final_text`. Resolve `trace_id`/`parent_span_id` the same way the module does today (from the streaming/tracing context vars it already reads). Preserve the exact public signature and return type. - -- [ ] **Step 4: Run** the characterization test — it must still pass (same messages). Adjust the test only if AGX1-373 deliberately changed the tool-message wire shape; in that case assert the post-373 shape and note it. Confirm tracing now occurs by default (assert spans via a fake tracer). - -- [ ] **Step 5: Commit** `refactor(pydantic-ai): reimplement stream_pydantic_ai_events on UnifiedEmitter (default tracing)`. - ---- - -## Task 4: Route sync ACP delivery through the surface + deprecate the bespoke tracing handler - -**Files:** -- Modify: `src/agentex/lib/adk/_modules/_pydantic_ai_tracing.py` -- (Reference) the sync ACP usage pattern in the pydantic-ai docs/templates. - -- [ ] **Step 1: Deprecate the bespoke tracing handler.** Add a `DeprecationWarning` (via `warnings.warn(...)`) and a docstring note to `create_pydantic_ai_tracing_handler` / `AgentexPydanticAITracingHandler` stating the unified surface (`UnifiedEmitter`, which derives spans from the canonical stream) supersedes it. Keep the symbols importable and functional (no removal — backward compat). - -- [ ] **Step 2: Confirm the sync path.** The sync tap remains `convert_pydantic_ai_to_agentex_events`. Document (in the module docstring of `_pydantic_ai_sync.py`) the recommended sync ACP usage: - -```python -turn = PydanticAITurn(agent.run_stream_events(...), model=...) -async for event in emitter.yield_turn(turn): - yield event -``` - -No code change beyond the docstring (the sync converter already yields the canonical stream; `yield_turn` adds tracing). Add a test that `emitter.yield_turn(PydanticAITurn(...))` forwards the same events the bare converter would and derives spans. - -- [ ] **Step 3: Run** tests; **Commit** `refactor(pydantic-ai): deprecate bespoke tracing handler; document unified sync path`. - ---- - -## Task 5: pydantic-ai cross-channel conformance fixtures - -**Files:** -- Create: `tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py` - -**Blocked by AGX1-373** (the cross-channel conformance runner). Once 373 is merged into the foundation branch: - -- [ ] **Step 1: Record canonical fixtures.** For 3-4 representative pydantic-ai runs (text-only; single tool; reasoning/thinking; multi-step text+tool), capture the `StreamTaskMessage*` sequence the tap produces (run `convert_pydantic_ai_to_agentex_events` over recorded `AgentStreamEvent` inputs, or hand-author the canonical sequences). Store as `Fixture(name=..., events=[...])`. - -- [ ] **Step 2: Register** each fixture with the conformance runner and let the cross-channel parametrized test (from AGX1-373) assert yield-vs-auto-send equivalence + span equivalence for each. Register/parametrize within THIS module (per the runner's documented per-module registry semantics). - -- [ ] **Step 3: Run** `./scripts/test tests/lib/core/harness/ -v` — all green. **Commit** `test(pydantic-ai): cross-channel conformance fixtures`. - ---- - -## Task 6: Three integration test agents (sync / async / temporal) - -**Files:** -- Create: `examples/tutorials/harness-pydantic-ai-sync/` , `…-async/` , `…-temporal/` (each a minimal Agentex agent). -- Modify: `.github/workflows/harness-integration.yml` (enable pydantic-ai `live-matrix` rows). -- Modify: `.github/workflows/agentex-tutorials-test.yml` if it enumerates agents. - -Each agent is the smallest agent that exercises one delivery channel through the unified surface with the pydantic-ai harness. - -- [ ] **Step 1: Scaffold from the existing templates.** Base each agent on the corresponding CLI template: `sync-pydantic-ai`, `default-pydantic-ai` (async), `temporal-pydantic-ai` (under `src/agentex/lib/cli/templates/`). In each, the message handler builds `PydanticAITurn(agent.run_stream_events(params.content.content), model=...)` and: - - sync agent: `async for ev in emitter.yield_turn(turn): yield ev` - - async + temporal agents: `await emitter.auto_send_turn(turn)` (temporal: inside the activity, as the template already structures it). - Use a tiny pydantic-ai agent with ONE trivial tool so the run exercises text + a tool call + tool response. - -- [ ] **Step 2: Write an integration test per agent** that drives it with a fixed prompt and asserts: valid ordered messages (text + tool request + tool response) and a well-formed span tree. Use the repo's existing tutorial-agent test harness pattern (see `agentex-tutorials-test.yml` and how current tutorial agents are tested). - -- [ ] **Step 3: Wire CI.** In `.github/workflows/harness-integration.yml`, replace the `if: false` placeholder `live-matrix` job (or add a real matrix) with the pydantic-ai × {sync, async, temporal} entries, each running its agent's integration test. If `agentex-tutorials-test.yml` enumerates agents, add the three there too. `log`/document any agent-type not covered (none expected for pydantic-ai). - -- [ ] **Step 4: Run** the integration tests locally (as far as the env allows) and the conformance + unit suites. **Commit** `test(pydantic-ai): sync/async/temporal integration agents + enable CI live-matrix rows`. - ---- - -## Task 7: Full suite, type check, and backward-compat audit - -- [ ] **Step 1:** `./scripts/test tests/lib/core/harness/ tests/lib/adk/ -v` — all green on 3.12 + 3.13. -- [ ] **Step 2:** `uv run pyright src/agentex/lib/` (or the harness + pydantic modules) — 0 new errors. -- [ ] **Step 3: Backward-compat audit.** Confirm the public signatures are unchanged: `convert_pydantic_ai_to_agentex_events` (only gained an optional kwarg), `stream_pydantic_ai_events` (same signature + return), `create_pydantic_ai_tracing_handler` (still importable, now warns). Grep the repo + templates for callers and confirm none broke. -- [ ] **Step 4:** If any fix was needed, **Commit** `chore(pydantic-ai): type/back-compat fixes`. - ---- - -## Self-Review checklist (run before opening the PR) - -- Every public symbol that existed before still exists with the same signature (additive-only): `convert_pydantic_ai_to_agentex_events`, `stream_pydantic_ai_events`, `create_pydantic_ai_tracing_handler`. -- The auto-send helper returns the same final text as before (characterization test passes, or the post-373 shape is asserted with a note). -- Tracing is now on by default for both channels and is overridable (emitter `tracer=False`). -- Usage normalization uses the REAL pydantic-ai usage field names (verified in Task 2 Step 1), with defensive `getattr`. -- Conformance fixtures register per-module and pass the cross-channel assertion from AGX1-373. -- 3 test agents exist and their CI rows are enabled. -- No `# type: ignore` added without justification. - -## Notes for the PR description - -- Link AGX1-373 (dependency) and AGX1-375 (import path); note AGX1-374 (reasoning/mixed-ordering auto_send tests) is foundation-level and orthogonal. -- State the diff size; if test agents pushed it over budget, note the PR 4b split. -- This is the template the langgraph (PR 5) and openai (PR 6) migrations follow. diff --git a/tests/lib/core/harness/_fakes.py b/tests/lib/core/harness/_fakes.py new file mode 100644 index 000000000..f9fd34a45 --- /dev/null +++ b/tests/lib/core/harness/_fakes.py @@ -0,0 +1,63 @@ +"""Shared test doubles for the unified harness test suites. + +A single superset implementation of the in-memory tracing backend used across +the harness tests. Three recording shapes were previously duplicated: + +- Shape-1 (richest): ``started`` = ``(name, parent_id, input)`` 3-tuples, + ``ended`` = ``(name, output)`` 2-tuples, plus an ``ended_spans`` list of the + closed ``FakeSpan`` objects (which carry ``.name``, ``.output``, ``.data``). +- Shape-2: ``started`` = ``(name, parent_id)`` 2-tuples, ``ended`` = + ``(name, output)``. +- Shape-3: ``started`` = bare names, ``ended`` = bare outputs. + +``FakeTracing`` records the richest (shape-1) form and exposes read-only +convenience properties (``started_names``, ``started_pairs``, +``ended_outputs``) so shape-2 and shape-3 assertions stay clean. +""" + +from __future__ import annotations + +from typing import Any + + +class FakeSpan: + def __init__(self, name: str) -> None: + self.name = name + self.output: Any = None + self.data: Any = None + + +class FakeTracing: + def __init__(self) -> None: + self.started: list[tuple[str, Any, Any]] = [] + self.ended: list[tuple[str, Any]] = [] + self.ended_spans: list[FakeSpan] = [] + + async def start_span( + self, + *, + trace_id: str, + name: str, + input: Any = None, + parent_id: Any = None, + data: Any = None, + task_id: Any = None, + ) -> FakeSpan: + self.started.append((name, parent_id, input)) + return FakeSpan(name) + + async def end_span(self, *, trace_id: str, span: FakeSpan) -> None: + self.ended.append((span.name, span.output)) + self.ended_spans.append(span) + + @property + def started_names(self) -> list[str]: + return [name for (name, _parent, _input) in self.started] + + @property + def started_pairs(self) -> list[tuple[str, Any]]: + return [(name, parent) for (name, parent, _input) in self.started] + + @property + def ended_outputs(self) -> list[Any]: + return [output for (_name, output) in self.ended] diff --git a/tests/lib/core/harness/conformance/conftest.py b/tests/lib/core/harness/conformance/conftest.py new file mode 100644 index 000000000..e4da7f1e2 --- /dev/null +++ b/tests/lib/core/harness/conformance/conftest.py @@ -0,0 +1,21 @@ +"""Conformance-suite test setup. + +Eagerly import every per-harness conformance module so each one's module-level +``register(...)`` calls run before any test executes. This makes +``all_fixtures()`` complete and independent of pytest's collection/import order +(the runner documents that cross-module registration order is not guaranteed), +so the cross-harness ``test_span_derivation_is_deterministic`` guard in +``test_conformance.py`` covers the full fixture set even when this directory is +run in isolation. +""" + +from __future__ import annotations + +# Importing these for their registration side effects only. +from . import ( + test_codex_conformance, # noqa: F401 + test_openai_conformance, # noqa: F401 + test_langgraph_conformance, # noqa: F401 + test_claude_code_conformance, # noqa: F401 + test_pydantic_ai_conformance, # noqa: F401 +) diff --git a/tests/lib/core/harness/conformance/runner.py b/tests/lib/core/harness/conformance/runner.py index 84e84fa51..e6928669a 100644 --- a/tests/lib/core/harness/conformance/runner.py +++ b/tests/lib/core/harness/conformance/runner.py @@ -43,8 +43,8 @@ because: - StreamingTaskMessageContext.close() persists initial_content when no deltas have been streamed, so the message IS correctly persisted. - - It mirrors the pattern already used by the real _langgraph_async.py harness, - keeping behavioural parity. + - It mirrors the pattern already used by the real langgraph streaming helper + (now in _langgraph_turn.py), keeping behavioural parity. - Switching to adk.messages.create would require an additional injectable dependency, adding surface area for no observable benefit. The conformance test treats this as an ACCEPTABLE envelope difference: at the @@ -53,18 +53,14 @@ identical because both adapters drive the same SpanDeriver.observe() call sequence and forward every signal to their tracer. -AGX1-377 fix: auto_send now DELIVERS streamed tool-request messages (Start+Done) -instead of dropping them. The conformance normaliser previously suppressed the -delivery for Start(tool_request)+Done on the yield channel to match auto_send's -old drop behaviour. That suppression is now removed: both channels produce a -LogicalDelivery for a streamed tool_request, and the cross-channel assertion -verifies it is delivered on both. +auto_send DELIVERS streamed tool-request messages (Start+Done): both channels +produce a LogicalDelivery for a streamed tool_request, and the cross-channel +assertion verifies it is delivered on both. """ from __future__ import annotations import json -import types as _types from typing import Any, NamedTuple, override from dataclasses import dataclass @@ -81,6 +77,8 @@ from agentex.types.reasoning_content_delta import ReasoningContentDelta from agentex.lib.core.harness.span_derivation import SpanDeriver +from .._fakes import FakeTracing + @dataclass class Fixture: @@ -145,8 +143,8 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe - reasoning: initial_content.summary joined (from Start) prepended to accumulated reasoning-content deltas (this catches a channel that drops the summary) - - tool_request: JSON-sorted arguments from the Start content (AGX1-377: now - delivered on both channels, no longer suppressed) + - tool_request: JSON-sorted arguments from the Start content (delivered on + both channels) - tool_response: str(content) from Full event """ from agentex.types.text_content import TextContent @@ -191,9 +189,9 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe ) ) elif ctype == "tool_request" and isinstance(content, ToolRequestContent): - # AGX1-377 fix: auto_send now delivers streamed tool-request - # messages. Emit a delivery here so the cross-channel - # assertion verifies it is present on both channels. + # auto_send delivers streamed tool-request messages. Emit a + # delivery here so the cross-channel assertion verifies it is + # present on both channels. deliveries.append( LogicalDelivery( content_type=ctype, @@ -296,30 +294,6 @@ def streaming_task_message_context( return _FakeCtx(self.sink, ctype, initial_content) -class _FakeTracing: - """Minimal tracing backend: records started/ended span names + outputs.""" - - def __init__(self) -> None: - self.started: list[str] = [] - self.ended: list[Any] = [] - - async def start_span( - self, - *, - trace_id: str, - name: str, - input: Any = None, - parent_id: Any = None, - data: Any = None, - task_id: Any = None, - ) -> Any: - self.started.append(name) - return _types.SimpleNamespace() - - async def end_span(self, *, trace_id: str, span: Any) -> None: - self.ended.append(getattr(span, "output", None)) - - class _RecordingTracer(SpanTracer): """SpanTracer that records every SpanSignal it actually receives. @@ -486,7 +460,7 @@ async def run_cross_channel_conformance( from agentex.lib.core.harness.yield_delivery import yield_events # --- yield channel --- - tracer_yield = _RecordingTracer(tracing=_FakeTracing()) + tracer_yield = _RecordingTracer(tracing=FakeTracing()) yield_out = [e async for e in yield_events(_gen(fixture.events), tracer=tracer_yield)] # Span signals the yield channel actually emitted to its tracer @@ -496,7 +470,7 @@ async def run_cross_channel_conformance( yield_deliveries = _yield_text_reasoning_seq(_yield_logical_deliveries(yield_out)) # --- auto_send channel --- - tracer_auto = _RecordingTracer(tracing=_FakeTracing()) + tracer_auto = _RecordingTracer(tracing=FakeTracing()) fake_streaming = _FakeStreaming() await auto_send( _gen(fixture.events), diff --git a/tests/lib/core/harness/conformance/test_codex_conformance.py b/tests/lib/core/harness/conformance/test_codex_conformance.py index b00ed2970..b3db4f56e 100644 --- a/tests/lib/core/harness/conformance/test_codex_conformance.py +++ b/tests/lib/core/harness/conformance/test_codex_conformance.py @@ -19,7 +19,7 @@ from agentex.lib.core.harness.types import StreamTaskMessage from agentex.lib.adk._modules._codex_sync import convert_codex_to_agentex_events -from .runner import Fixture, register, derive_all +from .runner import Fixture, register async def _aiter(items: list[Any]) -> AsyncIterator[Any]: @@ -208,17 +208,6 @@ def _build(events: list[Any]) -> list[StreamTaskMessage]: _LOCAL_FIXTURES = [_CODEX_TEXT, _CODEX_TOOL, _CODEX_REASONING, _CODEX_MULTI] -@pytest.mark.parametrize("fixture", _LOCAL_FIXTURES, ids=lambda f: f.name) -def test_codex_span_derivation_is_deterministic(fixture: Fixture) -> None: - """Span derivation over codex events is deterministic (cross-channel guarantee). - - Deriving twice over the same events yields identical signals. This is the - invariant that makes ``yield`` and ``auto_send`` delivery equivalent: both - observe the same event stream, so their tracing side effects are identical. - """ - assert derive_all(fixture.events) == derive_all(fixture.events) - - @pytest.mark.parametrize("fixture", _LOCAL_FIXTURES, ids=lambda f: f.name) def test_codex_events_are_non_empty(fixture: Fixture) -> None: """Every codex fixture yields at least one StreamTaskMessage*.""" diff --git a/tests/lib/core/harness/conformance/test_conformance.py b/tests/lib/core/harness/conformance/test_conformance.py index 6d5f8ca66..7c79f9397 100644 --- a/tests/lib/core/harness/conformance/test_conformance.py +++ b/tests/lib/core/harness/conformance/test_conformance.py @@ -24,11 +24,9 @@ Full vs Start+Done envelope difference is a documented, acceptable choice in auto_send — see runner.py for the rationale). -AGX1-377 fix: auto_send now delivers streamed tool-request messages. The -suppression that previously prevented the yield normaliser from emitting a -LogicalDelivery for Start(tool_request)+Done is removed. Both channels now -produce a delivery for streamed tool_request, verified by the -"streamed-tool-request" fixture. +auto_send delivers streamed tool-request messages: both channels produce a +delivery for streamed tool_request, verified by the "streamed-tool-request" +fixture. """ from __future__ import annotations @@ -134,9 +132,8 @@ StreamTaskMessageDone(type="done", index=0), ], ), - # fixture 4: streamed tool_request (AGX1-377 fix) — tool_request delivered - # via Start+Done (no Full). auto_send now delivers this instead of dropping - # it. Both channels must produce a LogicalDelivery for this fixture. + # fixture 4: streamed tool_request — tool_request delivered via Start+Done + # (no Full). Both channels must produce a LogicalDelivery for this fixture. Fixture( name="streamed-tool-request", events=[ @@ -275,11 +272,28 @@ async def test_cross_channel_equivalence(fixture: Fixture) -> None: # --------------------------------------------------------------------------- -@pytest.mark.parametrize("fixture", all_fixtures(), ids=lambda f: f.name) -def test_span_derivation_is_deterministic(fixture: Fixture) -> None: - """Span derivation over the same event list is idempotent. +def test_span_derivation_is_deterministic() -> None: + """Span derivation over the same event list is idempotent, for EVERY + registered fixture across all harnesses. + + ``all_fixtures()`` is read at run time (not at collection/parametrize time) + so it sees fixtures registered by every conformance module, regardless of + import/collection order. The per-harness conformance modules are imported + eagerly via ``conftest.py`` in this directory, so this test covers the full + cross-harness fixture set even when run in isolation. (Parametrizing on + ``all_fixtures()`` at import time would freeze the set to whatever happened + to be registered before this module was collected.) Retained as a lightweight regression guard. The primary cross-channel guarantee is asserted in test_cross_channel_equivalence above. """ - assert derive_all(fixture.events) == derive_all(fixture.events) + fixtures = all_fixtures() + assert len(fixtures) > len(_FIXTURES), ( + "expected per-harness fixtures to be registered in addition to the " + f"{len(_FIXTURES)} generic ones; got {len(fixtures)} total — a conformance " + "module's fixtures are not being registered (check conftest imports)" + ) + for fixture in fixtures: + assert derive_all(fixture.events) == derive_all(fixture.events), ( + f"[{fixture.name}] span derivation is not deterministic" + ) diff --git a/tests/lib/core/harness/conformance/test_langgraph_conformance.py b/tests/lib/core/harness/conformance/test_langgraph_conformance.py index 721d6aac5..a8d43aef6 100644 --- a/tests/lib/core/harness/conformance/test_langgraph_conformance.py +++ b/tests/lib/core/harness/conformance/test_langgraph_conformance.py @@ -32,7 +32,7 @@ from agentex.types.tool_response_content import ToolResponseContent from agentex.types.reasoning_content_delta import ReasoningContentDelta -from .runner import Fixture, register, derive_all, run_cross_channel_conformance +from .runner import Fixture, register, run_cross_channel_conformance # --------------------------------------------------------------------------- # Fixtures @@ -216,14 +216,3 @@ async def test_cross_channel_equivalence(fixture: Fixture) -> None: assert yield_spans == auto_spans, ( f"[{fixture.name}] span signals differ:\n yield: {yield_spans}\n auto_send: {auto_spans}" ) - - -# --------------------------------------------------------------------------- -# Backward-compatible determinism guard -# --------------------------------------------------------------------------- - - -@pytest.mark.parametrize("fixture", _LANGGRAPH_FIXTURES, ids=lambda f: f.name) -def test_span_derivation_is_deterministic(fixture: Fixture) -> None: - """Span derivation over the same event list is idempotent.""" - assert derive_all(fixture.events) == derive_all(fixture.events) diff --git a/tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py b/tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py index ca8234fda..3594de474 100644 --- a/tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py +++ b/tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py @@ -8,8 +8,8 @@ Streamed tool requests ---------------------- The pydantic-ai stream emits a tool REQUEST as Start + ToolRequestDelta + Done (not a -Full event). AGX1-377 has landed: both the conformance runner and auto_send now deliver -the Start+Delta+Done(tool_request) shape, so the cross-channel test asserts full +Full event). Both the conformance runner and auto_send deliver the +Start+Delta+Done(tool_request) shape, so the cross-channel test asserts full delivery-equivalence for streamed tool requests. The fixtures below retain the ToolRequestDelta events as the streamed tool-request inputs. """ @@ -39,7 +39,6 @@ from .runner import ( Fixture, register, - derive_all, run_cross_channel_conformance, ) @@ -78,8 +77,8 @@ def _build_fixtures() -> list[Fixture]: # ------------------------------------------------------------------ # # 2. Single tool call + tool response. # The canonical stream emits Start+ToolRequestDelta+Done for the request - # and Full(ToolResponseContent) for the response. See AGX1-377 note above - # for why the request delivery is not yet asserted cross-channel. + # and Full(ToolResponseContent) for the response. Both are asserted + # delivery-equivalent cross-channel (see the module docstring). # ------------------------------------------------------------------ # tool_call_pydantic = [ PartStartEvent( @@ -170,8 +169,8 @@ async def test_cross_channel_equivalence(fixture: Fixture) -> None: """Assert that yield_events and auto_send produce equivalent logical deliveries and identical span signals for each pydantic-ai fixture. - See runner.py for the full contract. The AGX1-377 note at the top of this - module explains why streamed-tool-request delivery is not yet asserted. + See runner.py for the full contract, including streamed-tool-request + delivery equivalence. """ yield_deliveries, auto_deliveries, yield_spans, auto_spans = await run_cross_channel_conformance(fixture) @@ -181,14 +180,3 @@ async def test_cross_channel_equivalence(fixture: Fixture) -> None: assert yield_spans == auto_spans, ( f"[{fixture.name}] span signals differ:\n yield: {yield_spans}\n auto_send: {auto_spans}" ) - - -# --------------------------------------------------------------------------- -# Backward-compatible determinism guard -# --------------------------------------------------------------------------- - - -@pytest.mark.parametrize("fixture", _FIXTURES, ids=lambda f: f.name) -def test_span_derivation_is_deterministic(fixture: Fixture) -> None: - """Span derivation over the same event list is idempotent.""" - assert derive_all(fixture.events) == derive_all(fixture.events) diff --git a/tests/lib/core/harness/test_auto_send.py b/tests/lib/core/harness/test_auto_send.py index 1948e9196..764dae8b3 100644 --- a/tests/lib/core/harness/test_auto_send.py +++ b/tests/lib/core/harness/test_auto_send.py @@ -9,7 +9,6 @@ This mirrors _langgraph_async.py lines 62-78 and 100-127. """ -import types as _types from datetime import datetime import pytest @@ -29,6 +28,8 @@ from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent +from ._fakes import FakeTracing + class _FakeCtx: """Mirrors StreamingTaskMessageContext: __aenter__ opens (returns self with task_message set), @@ -181,21 +182,9 @@ async def test_auto_send_posts_full_tool_messages(): # --------------------------------------------------------------------------- -class _RecordTracing: - def __init__(self): - self.started, self.ended = [], [] - - async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): - self.started.append(name) - return _types.SimpleNamespace() - - async def end_span(self, *, trace_id, span): - self.ended.append(getattr(span, "output", None)) - - @pytest.mark.asyncio async def test_auto_send_derives_tool_spans_via_tracer(): - fake_tracing = _RecordTracing() + fake_tracing = FakeTracing() tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake_tracing) streaming = _FakeStreaming() @@ -228,8 +217,8 @@ async def test_auto_send_derives_tool_spans_via_tracer(): result = await auto_send(_gen(events), task_id="task1", tracer=tracer, streaming=streaming) assert result.final_text == "" - assert fake_tracing.started == ["Bash"] - assert fake_tracing.ended == ["ok"] + assert fake_tracing.started_names == ["Bash"] + assert fake_tracing.ended_outputs == ["ok"] # --------------------------------------------------------------------------- @@ -301,13 +290,13 @@ async def _exploding_gen(): # --------------------------------------------------------------------------- -# Test 6: streamed tool_request delivered (AGX1-377 core) +# Test 6: streamed tool_request delivered # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_auto_send_streams_tool_request(): - """A Start(ToolRequestContent) MUST open a streaming context (AGX1-377).""" + """A Start(ToolRequestContent) MUST open a streaming context.""" streaming = _FakeStreaming() events = [ StreamTaskMessageStart( @@ -457,7 +446,7 @@ async def test_auto_send_full_text_content_sets_final_text(): # --------------------------------------------------------------------------- -# Test 10: created_at is forwarded to streaming context (AGX1-378) +# Test 10: created_at is forwarded to streaming context # --------------------------------------------------------------------------- diff --git a/tests/lib/core/harness/test_emitter.py b/tests/lib/core/harness/test_emitter.py index df155ec44..3f70660ec 100644 --- a/tests/lib/core/harness/test_emitter.py +++ b/tests/lib/core/harness/test_emitter.py @@ -11,13 +11,7 @@ StreamTaskMessageStart, ) - -class _FakeTracing: - async def start_span(self, **kw): - return None - - async def end_span(self, **kw): - pass +from ._fakes import FakeTracing class _FakeCtx: @@ -84,7 +78,7 @@ async def test_emitter_yield_mode_passes_through(): async def test_emitter_tracing_default_on_when_trace_id_present(): # Inject a fake tracing backend so the test env doesn't need temporalio. # This exercises the default-on path (tracer=None) when trace_id is truthy. - emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id="p", tracing=_FakeTracing()) + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id="p", tracing=FakeTracing()) assert emitter.tracer is not None diff --git a/tests/lib/core/harness/test_tracer.py b/tests/lib/core/harness/test_tracer.py index ed40cf595..b3d9002c4 100644 --- a/tests/lib/core/harness/test_tracer.py +++ b/tests/lib/core/harness/test_tracer.py @@ -5,32 +5,12 @@ from agentex.lib.core.harness.types import OpenSpan, CloseSpan from agentex.lib.core.harness.tracer import SpanTracer - -class _FakeSpan: - def __init__(self, name): - self.name = name - self.output = None - self.data = None - - -class _FakeTracing: - def __init__(self): - self.started = [] - self.ended = [] - self.ended_spans = [] - - async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): - self.started.append((name, parent_id, input)) - return _FakeSpan(name) - - async def end_span(self, *, trace_id, span): - self.ended.append((span.name, span.output)) - self.ended_spans.append(span) +from ._fakes import FakeTracing @pytest.mark.asyncio async def test_open_then_close_starts_and_ends_span(): - fake = _FakeTracing() + fake = FakeTracing() tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) await tracer.handle(OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"})) await tracer.handle(CloseSpan(key="call_1", output="files", is_complete=True)) @@ -41,7 +21,7 @@ async def test_open_then_close_starts_and_ends_span(): @pytest.mark.asyncio async def test_close_records_is_error_on_span_data(): """A CloseSpan carrying is_error records the status on span.data (AGX1-371).""" - fake = _FakeTracing() + fake = FakeTracing() tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) await tracer.handle(OpenSpan(key="call_err", kind="tool", name="Bash", input={})) await tracer.handle(CloseSpan(key="call_err", output="boom", is_complete=True, is_error=True)) @@ -51,7 +31,7 @@ async def test_close_records_is_error_on_span_data(): @pytest.mark.asyncio async def test_close_without_status_leaves_span_data_untouched(): """is_error=None (no status reported) must not write to span.data.""" - fake = _FakeTracing() + fake = FakeTracing() tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) await tracer.handle(OpenSpan(key="call_1", kind="tool", name="Bash", input={})) await tracer.handle(CloseSpan(key="call_1", output="files", is_complete=True)) @@ -60,7 +40,7 @@ async def test_close_without_status_leaves_span_data_untouched(): @pytest.mark.asyncio async def test_no_trace_id_is_noop(): - fake = _FakeTracing() + fake = FakeTracing() tracer = SpanTracer(trace_id="", parent_span_id=None, tracing=fake) await tracer.handle(OpenSpan(key="k", kind="tool", name="X")) await tracer.handle(CloseSpan(key="k")) @@ -69,7 +49,7 @@ async def test_no_trace_id_is_noop(): @pytest.mark.asyncio async def test_tracing_failure_is_swallowed(): - class _Boom(_FakeTracing): + class _Boom(FakeTracing): @override async def start_span(self, **kw): raise RuntimeError("backend down") @@ -83,7 +63,7 @@ async def start_span(self, **kw): @pytest.mark.asyncio async def test_duplicate_open_replaces_silently(): - fake = _FakeTracing() + fake = FakeTracing() tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) await tracer.handle(OpenSpan(key="k", kind="tool", name="A")) await tracer.handle(OpenSpan(key="k", kind="tool", name="B")) diff --git a/tests/lib/core/harness/test_yield_delivery.py b/tests/lib/core/harness/test_yield_delivery.py index f3f491d84..ef3861a16 100644 --- a/tests/lib/core/harness/test_yield_delivery.py +++ b/tests/lib/core/harness/test_yield_delivery.py @@ -1,5 +1,3 @@ -import types as _types - import pytest from agentex.lib.core.harness.tracer import SpanTracer @@ -12,17 +10,7 @@ from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.core.harness.yield_delivery import yield_events - -class _RecordTracing: - def __init__(self): - self.started, self.ended = [], [] - - async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): - self.started.append(name) - return _types.SimpleNamespace() # supports arbitrary attribute assignment (span.output = ...) - - async def end_span(self, *, trace_id, span): - self.ended.append(getattr(span, "output", None)) +from ._fakes import FakeTracing async def _gen(events): @@ -32,7 +20,7 @@ async def _gen(events): @pytest.mark.asyncio async def test_yield_passes_events_through_and_traces(): - fake = _RecordTracing() + fake = FakeTracing() tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake) events = [ StreamTaskMessageStart( @@ -53,8 +41,8 @@ async def test_yield_passes_events_through_and_traces(): ] out = [e async for e in yield_events(_gen(events), tracer=tracer)] assert out == events # passthrough unchanged - assert fake.started == ["Bash"] # span derived + opened - assert fake.ended == ["ok"] # span closed with response + assert fake.started_names == ["Bash"] # span derived + opened + assert fake.ended_outputs == ["ok"] # span closed with response @pytest.mark.asyncio @@ -68,7 +56,7 @@ async def test_yield_without_tracer_is_pure_passthrough(): @pytest.mark.asyncio async def test_flush_runs_on_early_close(): - fake = _RecordTracing() + fake = FakeTracing() tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake) events = [ StreamTaskMessageStart( @@ -85,5 +73,5 @@ async def test_flush_runs_on_early_close(): first = await gen.__anext__() # Start second = await gen.__anext__() # Done -> tool span opens here await gen.aclose() # triggers the finally -> flush() - assert fake.started == ["Bash"] - assert fake.ended == [None] # flush closed the unpaired span (incomplete, no output) + assert fake.started_names == ["Bash"] + assert fake.ended_outputs == [None] # flush closed the unpaired span (incomplete, no output) From 894f7200237e5121e384535e025a4ca4ec74f5e9 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 15:52:10 -0400 Subject: [PATCH 2/2] test(harness): build conformance fixtures with a loop-free driver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review: the codex and pydantic-ai conformance modules called asyncio.run() at import time, which raises RuntimeError when collected under an already-running event loop (programmatic pytest, notebooks) — so even a focused run could fail during collection. Hoist the loop-free driver from the claude-code module into runner.run_pure_async and use it everywhere fixtures are built at import time. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/lib/core/harness/conformance/runner.py | 19 ++++++++++++ .../test_claude_code_conformance.py | 30 +++++++------------ .../conformance/test_codex_conformance.py | 7 +++-- .../test_pydantic_ai_conformance.py | 17 +++++++---- 4 files changed, 44 insertions(+), 29 deletions(-) diff --git a/tests/lib/core/harness/conformance/runner.py b/tests/lib/core/harness/conformance/runner.py index e6928669a..02a07f726 100644 --- a/tests/lib/core/harness/conformance/runner.py +++ b/tests/lib/core/harness/conformance/runner.py @@ -97,6 +97,25 @@ def all_fixtures() -> list[Fixture]: return list(_REGISTRY) +def run_pure_async(coro: Any) -> Any: + """Drive a *pure* (I/O-free) coroutine to completion without an event loop. + + Conformance fixtures are built at import time so they can parametrize the + tests below. The fixture-building coroutines only iterate in-memory events + and never suspend on a real future, so we step them by hand instead of + ``asyncio.run()``. ``asyncio.run()`` at import raises ``RuntimeError`` when a + loop is already running (programmatic pytest, a Jupyter kernel, or a + session-scoped asyncio loop); this driver is unaffected by ambient loop + state. It raises if the coroutine ever suspends on real I/O. + """ + try: + coro.send(None) + except StopIteration as stop: + return stop.value + coro.close() + raise RuntimeError("conformance fixture build unexpectedly suspended on real I/O") + + def derive_all(events: list[StreamTaskMessage]) -> list[SpanSignal]: d = SpanDeriver() out: list[SpanSignal] = [] diff --git a/tests/lib/core/harness/conformance/test_claude_code_conformance.py b/tests/lib/core/harness/conformance/test_claude_code_conformance.py index 88643a4cd..010bc530b 100644 --- a/tests/lib/core/harness/conformance/test_claude_code_conformance.py +++ b/tests/lib/core/harness/conformance/test_claude_code_conformance.py @@ -29,8 +29,6 @@ from __future__ import annotations -from typing import Any - import pytest from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events @@ -38,6 +36,7 @@ from .runner import ( Fixture, register, + run_pure_async, run_cross_channel_conformance, ) @@ -155,25 +154,16 @@ async def _build_fixture(name: str, envelopes: list[dict]) -> Fixture: # Fixtures must exist before pytest collects (they parametrize the test below), # so they are built at import time. The conversion only iterates in-memory -# envelopes — it never suspends on a real future — so we drive the coroutine to -# completion by hand instead of asyncio.run(). asyncio.run() at import raises -# RuntimeError when an event loop is already running (programmatic pytest, a -# Jupyter kernel, or session-scoped asyncio loops); the loop-free driver below -# is unaffected by the ambient loop state. -def _run_pure_async(coro: Any) -> Any: - try: - coro.send(None) - except StopIteration as stop: - return stop.value - coro.close() - raise RuntimeError("conformance fixture build unexpectedly suspended on real I/O") - - +# envelopes — it never suspends on a real future — so we drive the coroutines to +# completion with the shared loop-free ``run_pure_async`` driver instead of +# asyncio.run(), which raises RuntimeError at import when an event loop is +# already running (programmatic pytest, a Jupyter kernel, or session-scoped +# asyncio loops). _FIXTURES: list[Fixture] = [ - _run_pure_async(_build_fixture("claude-code-text-only", _TEXT_ENVELOPES)), - _run_pure_async(_build_fixture("claude-code-tool-call-result", _TOOL_ENVELOPES)), - _run_pure_async(_build_fixture("claude-code-thinking-block", _THINKING_ENVELOPES)), - _run_pure_async(_build_fixture("claude-code-multi-step", _MULTI_STEP_ENVELOPES)), + run_pure_async(_build_fixture("claude-code-text-only", _TEXT_ENVELOPES)), + run_pure_async(_build_fixture("claude-code-tool-call-result", _TOOL_ENVELOPES)), + run_pure_async(_build_fixture("claude-code-thinking-block", _THINKING_ENVELOPES)), + run_pure_async(_build_fixture("claude-code-multi-step", _MULTI_STEP_ENVELOPES)), ] # Register into the shared registry so all_fixtures() can enumerate them diff --git a/tests/lib/core/harness/conformance/test_codex_conformance.py b/tests/lib/core/harness/conformance/test_codex_conformance.py index b3db4f56e..d51a73584 100644 --- a/tests/lib/core/harness/conformance/test_codex_conformance.py +++ b/tests/lib/core/harness/conformance/test_codex_conformance.py @@ -11,7 +11,6 @@ from __future__ import annotations -import asyncio from typing import Any, AsyncIterator import pytest @@ -19,7 +18,7 @@ from agentex.lib.core.harness.types import StreamTaskMessage from agentex.lib.adk._modules._codex_sync import convert_codex_to_agentex_events -from .runner import Fixture, register +from .runner import Fixture, register, run_pure_async async def _aiter(items: list[Any]) -> AsyncIterator[Any]: @@ -32,7 +31,9 @@ async def _collect(events: list[Any]) -> list[StreamTaskMessage]: def _build(events: list[Any]) -> list[StreamTaskMessage]: - return asyncio.run(_collect(events)) + # Loop-free driver: this runs at import time, where asyncio.run() would raise + # under an already-running loop (programmatic pytest, notebooks). + return run_pure_async(_collect(events)) # --------------------------------------------------------------------------- diff --git a/tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py b/tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py index 3594de474..5d9952334 100644 --- a/tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py +++ b/tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py @@ -16,7 +16,6 @@ from __future__ import annotations -import asyncio from typing import Any, AsyncIterator import pytest @@ -39,6 +38,7 @@ from .runner import ( Fixture, register, + run_pure_async, run_cross_channel_conformance, ) @@ -62,7 +62,12 @@ async def _canonical(pydantic_events: list[Any]) -> list[Any]: def _build_fixtures() -> list[Fixture]: - """Build all pydantic-ai conformance fixtures synchronously via asyncio.run.""" + """Build all pydantic-ai conformance fixtures synchronously at import time. + + Uses the loop-free ``run_pure_async`` driver rather than ``asyncio.run()``, + which would raise under an already-running loop (programmatic pytest, + notebooks) since this runs during module import. + """ # ------------------------------------------------------------------ # # 1. Text-only run: simple streaming text response. @@ -139,10 +144,10 @@ def _build_fixtures() -> list[Fixture]: PartEndEvent(index=0, part=TextPart(content="It's cloudy and 15C in London.")), ] - text_only_events = asyncio.run(_canonical(text_only_pydantic)) - tool_call_events = asyncio.run(_canonical(tool_call_pydantic)) - reasoning_events = asyncio.run(_canonical(reasoning_pydantic)) - multi_step_events = asyncio.run(_canonical(multi_step_pydantic)) + text_only_events = run_pure_async(_canonical(text_only_pydantic)) + tool_call_events = run_pure_async(_canonical(tool_call_pydantic)) + reasoning_events = run_pure_async(_canonical(reasoning_pydantic)) + multi_step_events = run_pure_async(_canonical(multi_step_pydantic)) return [ Fixture(name="pydantic-ai-text-only", events=text_only_events),