Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

63 changes: 63 additions & 0 deletions tests/lib/core/harness/_fakes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Shared test doubles for the unified harness test suites.

A single superset implementation of the in-memory tracing backend used across
the harness tests. Three recording shapes were previously duplicated:

- Shape-1 (richest): ``started`` = ``(name, parent_id, input)`` 3-tuples,
``ended`` = ``(name, output)`` 2-tuples, plus an ``ended_spans`` list of the
closed ``FakeSpan`` objects (which carry ``.name``, ``.output``, ``.data``).
- Shape-2: ``started`` = ``(name, parent_id)`` 2-tuples, ``ended`` =
``(name, output)``.
- Shape-3: ``started`` = bare names, ``ended`` = bare outputs.

``FakeTracing`` records the richest (shape-1) form and exposes read-only
convenience properties (``started_names``, ``started_pairs``,
``ended_outputs``) so shape-2 and shape-3 assertions stay clean.
"""

from __future__ import annotations

from typing import Any


class FakeSpan:
def __init__(self, name: str) -> None:
self.name = name
self.output: Any = None
self.data: Any = None


class FakeTracing:
def __init__(self) -> None:
self.started: list[tuple[str, Any, Any]] = []
self.ended: list[tuple[str, Any]] = []
self.ended_spans: list[FakeSpan] = []

async def start_span(
self,
*,
trace_id: str,
name: str,
input: Any = None,
parent_id: Any = None,
data: Any = None,
task_id: Any = None,
) -> FakeSpan:
self.started.append((name, parent_id, input))
return FakeSpan(name)

async def end_span(self, *, trace_id: str, span: FakeSpan) -> None:
self.ended.append((span.name, span.output))
self.ended_spans.append(span)

@property
def started_names(self) -> list[str]:
return [name for (name, _parent, _input) in self.started]

@property
def started_pairs(self) -> list[tuple[str, Any]]:
return [(name, parent) for (name, parent, _input) in self.started]

@property
def ended_outputs(self) -> list[Any]:
return [output for (_name, output) in self.ended]
21 changes: 21 additions & 0 deletions tests/lib/core/harness/conformance/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Conformance-suite test setup.

Eagerly import every per-harness conformance module so each one's module-level
``register(...)`` calls run before any test executes. This makes
``all_fixtures()`` complete and independent of pytest's collection/import order
(the runner documents that cross-module registration order is not guaranteed),
so the cross-harness ``test_span_derivation_is_deterministic`` guard in
``test_conformance.py`` covers the full fixture set even when this directory is
run in isolation.
"""

from __future__ import annotations

# Importing these for their registration side effects only.
from . import (
test_codex_conformance, # noqa: F401
test_openai_conformance, # noqa: F401
test_langgraph_conformance, # noqa: F401
test_claude_code_conformance, # noqa: F401
test_pydantic_ai_conformance, # noqa: F401
Comment thread
declan-scale marked this conversation as resolved.
)
73 changes: 33 additions & 40 deletions tests/lib/core/harness/conformance/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
because:
- StreamingTaskMessageContext.close() persists initial_content when no deltas
have been streamed, so the message IS correctly persisted.
- It mirrors the pattern already used by the real _langgraph_async.py harness,
keeping behavioural parity.
- It mirrors the pattern already used by the real langgraph streaming helper
(now in _langgraph_turn.py), keeping behavioural parity.
- Switching to adk.messages.create would require an additional injectable
dependency, adding surface area for no observable benefit.
The conformance test treats this as an ACCEPTABLE envelope difference: at the
Expand All @@ -53,18 +53,14 @@
identical because both adapters drive the same SpanDeriver.observe() call
sequence and forward every signal to their tracer.

AGX1-377 fix: auto_send now DELIVERS streamed tool-request messages (Start+Done)
instead of dropping them. The conformance normaliser previously suppressed the
delivery for Start(tool_request)+Done on the yield channel to match auto_send's
old drop behaviour. That suppression is now removed: both channels produce a
LogicalDelivery for a streamed tool_request, and the cross-channel assertion
verifies it is delivered on both.
auto_send DELIVERS streamed tool-request messages (Start+Done): both channels
produce a LogicalDelivery for a streamed tool_request, and the cross-channel
assertion verifies it is delivered on both.
"""

from __future__ import annotations

import json
import types as _types
from typing import Any, NamedTuple, override
from dataclasses import dataclass

Expand All @@ -81,6 +77,8 @@
from agentex.types.reasoning_content_delta import ReasoningContentDelta
from agentex.lib.core.harness.span_derivation import SpanDeriver

from .._fakes import FakeTracing


@dataclass
class Fixture:
Expand All @@ -99,6 +97,25 @@ def all_fixtures() -> list[Fixture]:
return list(_REGISTRY)


def run_pure_async(coro: Any) -> Any:
"""Drive a *pure* (I/O-free) coroutine to completion without an event loop.

Conformance fixtures are built at import time so they can parametrize the
tests below. The fixture-building coroutines only iterate in-memory events
and never suspend on a real future, so we step them by hand instead of
``asyncio.run()``. ``asyncio.run()`` at import raises ``RuntimeError`` when a
loop is already running (programmatic pytest, a Jupyter kernel, or a
session-scoped asyncio loop); this driver is unaffected by ambient loop
state. It raises if the coroutine ever suspends on real I/O.
"""
try:
coro.send(None)
except StopIteration as stop:
return stop.value
coro.close()
raise RuntimeError("conformance fixture build unexpectedly suspended on real I/O")


def derive_all(events: list[StreamTaskMessage]) -> list[SpanSignal]:
d = SpanDeriver()
out: list[SpanSignal] = []
Expand Down Expand Up @@ -145,8 +162,8 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe
- reasoning: initial_content.summary joined (from Start) prepended to
accumulated reasoning-content deltas (this catches a channel that drops
the summary)
- tool_request: JSON-sorted arguments from the Start content (AGX1-377: now
delivered on both channels, no longer suppressed)
- tool_request: JSON-sorted arguments from the Start content (delivered on
both channels)
- tool_response: str(content) from Full event
"""
from agentex.types.text_content import TextContent
Expand Down Expand Up @@ -191,9 +208,9 @@ def _yield_logical_deliveries(events: list[StreamTaskMessage]) -> list[LogicalDe
)
)
elif ctype == "tool_request" and isinstance(content, ToolRequestContent):
# AGX1-377 fix: auto_send now delivers streamed tool-request
# messages. Emit a delivery here so the cross-channel
# assertion verifies it is present on both channels.
# auto_send delivers streamed tool-request messages. Emit a
# delivery here so the cross-channel assertion verifies it is
# present on both channels.
deliveries.append(
LogicalDelivery(
content_type=ctype,
Expand Down Expand Up @@ -296,30 +313,6 @@ def streaming_task_message_context(
return _FakeCtx(self.sink, ctype, initial_content)


class _FakeTracing:
"""Minimal tracing backend: records started/ended span names + outputs."""

def __init__(self) -> None:
self.started: list[str] = []
self.ended: list[Any] = []

async def start_span(
self,
*,
trace_id: str,
name: str,
input: Any = None,
parent_id: Any = None,
data: Any = None,
task_id: Any = None,
) -> Any:
self.started.append(name)
return _types.SimpleNamespace()

async def end_span(self, *, trace_id: str, span: Any) -> None:
self.ended.append(getattr(span, "output", None))


class _RecordingTracer(SpanTracer):
"""SpanTracer that records every SpanSignal it actually receives.

Expand Down Expand Up @@ -486,7 +479,7 @@ async def run_cross_channel_conformance(
from agentex.lib.core.harness.yield_delivery import yield_events

# --- yield channel ---
tracer_yield = _RecordingTracer(tracing=_FakeTracing())
tracer_yield = _RecordingTracer(tracing=FakeTracing())
yield_out = [e async for e in yield_events(_gen(fixture.events), tracer=tracer_yield)]

# Span signals the yield channel actually emitted to its tracer
Expand All @@ -496,7 +489,7 @@ async def run_cross_channel_conformance(
yield_deliveries = _yield_text_reasoning_seq(_yield_logical_deliveries(yield_out))

# --- auto_send channel ---
tracer_auto = _RecordingTracer(tracing=_FakeTracing())
tracer_auto = _RecordingTracer(tracing=FakeTracing())
fake_streaming = _FakeStreaming()
await auto_send(
_gen(fixture.events),
Expand Down
30 changes: 10 additions & 20 deletions tests/lib/core/harness/conformance/test_claude_code_conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@

from __future__ import annotations

from typing import Any

import pytest

from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events

from .runner import (
Fixture,
register,
run_pure_async,
run_cross_channel_conformance,
)

Expand Down Expand Up @@ -155,25 +154,16 @@ async def _build_fixture(name: str, envelopes: list[dict]) -> Fixture:

# Fixtures must exist before pytest collects (they parametrize the test below),
# so they are built at import time. The conversion only iterates in-memory
# envelopes — it never suspends on a real future — so we drive the coroutine to
# completion by hand instead of asyncio.run(). asyncio.run() at import raises
# RuntimeError when an event loop is already running (programmatic pytest, a
# Jupyter kernel, or session-scoped asyncio loops); the loop-free driver below
# is unaffected by the ambient loop state.
def _run_pure_async(coro: Any) -> Any:
try:
coro.send(None)
except StopIteration as stop:
return stop.value
coro.close()
raise RuntimeError("conformance fixture build unexpectedly suspended on real I/O")


# envelopes — it never suspends on a real future — so we drive the coroutines to
# completion with the shared loop-free ``run_pure_async`` driver instead of
# asyncio.run(), which raises RuntimeError at import when an event loop is
# already running (programmatic pytest, a Jupyter kernel, or session-scoped
# asyncio loops).
_FIXTURES: list[Fixture] = [
_run_pure_async(_build_fixture("claude-code-text-only", _TEXT_ENVELOPES)),
_run_pure_async(_build_fixture("claude-code-tool-call-result", _TOOL_ENVELOPES)),
_run_pure_async(_build_fixture("claude-code-thinking-block", _THINKING_ENVELOPES)),
_run_pure_async(_build_fixture("claude-code-multi-step", _MULTI_STEP_ENVELOPES)),
run_pure_async(_build_fixture("claude-code-text-only", _TEXT_ENVELOPES)),
run_pure_async(_build_fixture("claude-code-tool-call-result", _TOOL_ENVELOPES)),
run_pure_async(_build_fixture("claude-code-thinking-block", _THINKING_ENVELOPES)),
run_pure_async(_build_fixture("claude-code-multi-step", _MULTI_STEP_ENVELOPES)),
]

# Register into the shared registry so all_fixtures() can enumerate them
Expand Down
18 changes: 4 additions & 14 deletions tests/lib/core/harness/conformance/test_codex_conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@

from __future__ import annotations

import asyncio
from typing import Any, AsyncIterator

import pytest

from agentex.lib.core.harness.types import StreamTaskMessage
from agentex.lib.adk._modules._codex_sync import convert_codex_to_agentex_events

from .runner import Fixture, register, derive_all
from .runner import Fixture, register, run_pure_async


async def _aiter(items: list[Any]) -> AsyncIterator[Any]:
Expand All @@ -32,7 +31,9 @@ async def _collect(events: list[Any]) -> list[StreamTaskMessage]:


def _build(events: list[Any]) -> list[StreamTaskMessage]:
return asyncio.run(_collect(events))
# Loop-free driver: this runs at import time, where asyncio.run() would raise
# under an already-running loop (programmatic pytest, notebooks).
return run_pure_async(_collect(events))


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -208,17 +209,6 @@ def _build(events: list[Any]) -> list[StreamTaskMessage]:
_LOCAL_FIXTURES = [_CODEX_TEXT, _CODEX_TOOL, _CODEX_REASONING, _CODEX_MULTI]


@pytest.mark.parametrize("fixture", _LOCAL_FIXTURES, ids=lambda f: f.name)
def test_codex_span_derivation_is_deterministic(fixture: Fixture) -> None:
"""Span derivation over codex events is deterministic (cross-channel guarantee).

Deriving twice over the same events yields identical signals. This is the
invariant that makes ``yield`` and ``auto_send`` delivery equivalent: both
observe the same event stream, so their tracing side effects are identical.
"""
assert derive_all(fixture.events) == derive_all(fixture.events)


@pytest.mark.parametrize("fixture", _LOCAL_FIXTURES, ids=lambda f: f.name)
def test_codex_events_are_non_empty(fixture: Fixture) -> None:
"""Every codex fixture yields at least one StreamTaskMessage*."""
Expand Down
38 changes: 26 additions & 12 deletions tests/lib/core/harness/conformance/test_conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
Full vs Start+Done envelope difference is a documented, acceptable choice in
auto_send — see runner.py for the rationale).

AGX1-377 fix: auto_send now delivers streamed tool-request messages. The
suppression that previously prevented the yield normaliser from emitting a
LogicalDelivery for Start(tool_request)+Done is removed. Both channels now
produce a delivery for streamed tool_request, verified by the
"streamed-tool-request" fixture.
auto_send delivers streamed tool-request messages: both channels produce a
delivery for streamed tool_request, verified by the "streamed-tool-request"
fixture.
"""

from __future__ import annotations
Expand Down Expand Up @@ -134,9 +132,8 @@
StreamTaskMessageDone(type="done", index=0),
],
),
# fixture 4: streamed tool_request (AGX1-377 fix) — tool_request delivered
# via Start+Done (no Full). auto_send now delivers this instead of dropping
# it. Both channels must produce a LogicalDelivery for this fixture.
# fixture 4: streamed tool_request — tool_request delivered via Start+Done
# (no Full). Both channels must produce a LogicalDelivery for this fixture.
Fixture(
name="streamed-tool-request",
events=[
Expand Down Expand Up @@ -275,11 +272,28 @@ async def test_cross_channel_equivalence(fixture: Fixture) -> None:
# ---------------------------------------------------------------------------


@pytest.mark.parametrize("fixture", all_fixtures(), ids=lambda f: f.name)
def test_span_derivation_is_deterministic(fixture: Fixture) -> None:
"""Span derivation over the same event list is idempotent.
def test_span_derivation_is_deterministic() -> None:
"""Span derivation over the same event list is idempotent, for EVERY
registered fixture across all harnesses.

``all_fixtures()`` is read at run time (not at collection/parametrize time)
so it sees fixtures registered by every conformance module, regardless of
import/collection order. The per-harness conformance modules are imported
eagerly via ``conftest.py`` in this directory, so this test covers the full
cross-harness fixture set even when run in isolation. (Parametrizing on
``all_fixtures()`` at import time would freeze the set to whatever happened
to be registered before this module was collected.)

Retained as a lightweight regression guard. The primary cross-channel
guarantee is asserted in test_cross_channel_equivalence above.
"""
assert derive_all(fixture.events) == derive_all(fixture.events)
fixtures = all_fixtures()
assert len(fixtures) > len(_FIXTURES), (
"expected per-harness fixtures to be registered in addition to the "
f"{len(_FIXTURES)} generic ones; got {len(fixtures)} total — a conformance "
"module's fixtures are not being registered (check conftest imports)"
)
for fixture in fixtures:
assert derive_all(fixture.events) == derive_all(fixture.events), (
f"[{fixture.name}] span derivation is not deterministic"
)
Loading
Loading