|
| 1 | +# Unified Harness Surface |
| 2 | + |
| 3 | +The unified harness surface gives every agent harness (pydantic-ai, LangGraph, OpenAI Agents, and future parsers) a single, shared path to streaming, message persistence, and tracing. The Agentex `StreamTaskMessage*` event stream is the canonical wire format. A harness tap produces that stream once; the shared machinery delivers it and derives spans from it. |
| 4 | + |
| 5 | +All public names are re-exported from `agentex.lib.adk`: |
| 6 | + |
| 7 | +```python |
| 8 | +from agentex.lib.adk import ( |
| 9 | + UnifiedEmitter, |
| 10 | + SpanTracer, |
| 11 | + TurnUsage, |
| 12 | + TurnResult, |
| 13 | + HarnessTurn, |
| 14 | + StreamTaskMessage, |
| 15 | + OpenSpan, |
| 16 | + CloseSpan, |
| 17 | + SpanSignal, |
| 18 | +) |
| 19 | +``` |
| 20 | + |
| 21 | +The implementation lives at `src/agentex/lib/core/harness/`. |
| 22 | + |
| 23 | +--- |
| 24 | + |
| 25 | +## The canonical stream: `StreamTaskMessage` |
| 26 | + |
| 27 | +`StreamTaskMessage` is a union of the four wire-protocol update types: |
| 28 | + |
| 29 | +``` |
| 30 | +StreamTaskMessageStart - opens a content slot (text, reasoning, tool request, ...) |
| 31 | +StreamTaskMessageDelta - appends a token/fragment to an open slot |
| 32 | +StreamTaskMessageFull - posts a complete message in one shot (tool response, ...) |
| 33 | +StreamTaskMessageDone - closes an open slot |
| 34 | +``` |
| 35 | + |
| 36 | +Every harness tap produces a sequence of these. Everything downstream (delivery, tracing) reads the same sequence. |
| 37 | + |
| 38 | +--- |
| 39 | + |
| 40 | +## Per-harness taps: `convert_<harness>_to_agentex_events` |
| 41 | + |
| 42 | +A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The currently shipped taps are: |
| 43 | + |
| 44 | +| Harness | Tap function | Exported from | |
| 45 | +|---|---|---| |
| 46 | +| pydantic-ai | `convert_pydantic_ai_to_agentex_events` | `agentex.lib.adk` | |
| 47 | +| LangGraph | `convert_langgraph_to_agentex_events` | `agentex.lib.adk` | |
| 48 | + |
| 49 | +Taps for claude-code and codex will be added in subsequent PRs (AGX1-420, AGX1-421) and exported from `agentex.lib.adk` in the same way. |
| 50 | + |
| 51 | +--- |
| 52 | + |
| 53 | +## `HarnessTurn` protocol |
| 54 | + |
| 55 | +`HarnessTurn` is the interface a harness turn object must satisfy to plug into `UnifiedEmitter`: |
| 56 | + |
| 57 | +```python |
| 58 | +@runtime_checkable |
| 59 | +class HarnessTurn(Protocol): |
| 60 | + @property |
| 61 | + def events(self) -> AsyncIterator[StreamTaskMessage]: ... |
| 62 | + |
| 63 | + def usage(self) -> TurnUsage: ... |
| 64 | +``` |
| 65 | + |
| 66 | +`events` is the canonical stream for this turn. `usage()` is valid only after `events` is exhausted (async generators cannot cleanly return a value to the consumer, so usage travels out-of-band). |
| 67 | + |
| 68 | +--- |
| 69 | + |
| 70 | +## `TurnUsage` |
| 71 | + |
| 72 | +Token counts and cost for one turn, harness-independent: |
| 73 | + |
| 74 | +```python |
| 75 | +class TurnUsage(BaseModel): |
| 76 | + model: str | None = None |
| 77 | + input_tokens: int | None = None |
| 78 | + output_tokens: int | None = None |
| 79 | + cached_input_tokens: int | None = None |
| 80 | + reasoning_tokens: int | None = None |
| 81 | + total_tokens: int | None = None |
| 82 | + cost_usd: float | None = None |
| 83 | + duration_ms: int | None = None |
| 84 | + num_llm_calls: int = 0 |
| 85 | + num_tool_calls: int = 0 |
| 86 | + num_reasoning_blocks: int = 0 |
| 87 | +``` |
| 88 | + |
| 89 | +Field names align with `agentex.lib.core.observability.llm_metrics` for easy conversion. |
| 90 | + |
| 91 | +--- |
| 92 | + |
| 93 | +## `UnifiedEmitter` |
| 94 | + |
| 95 | +`UnifiedEmitter` ties a turn's canonical stream, tracing context, and delivery mode together. Construct one per turn with the task/trace context from the request: |
| 96 | + |
| 97 | +```python |
| 98 | +emitter = UnifiedEmitter( |
| 99 | + task_id=params.task.id, |
| 100 | + trace_id=params.task.id, # or None to disable tracing |
| 101 | + parent_span_id=turn_span.id if turn_span else None, |
| 102 | +) |
| 103 | +``` |
| 104 | + |
| 105 | +**Tracing is on by default** when `trace_id` is provided. To disable it explicitly, pass `tracer=False`. To inject a custom `SpanTracer` (e.g. in tests), pass it as `tracer=<instance>`. |
| 106 | + |
| 107 | +### Delivery mode 1: `yield_turn` (sync HTTP ACP) |
| 108 | + |
| 109 | +For sync ACP agents that return events directly over the HTTP response: |
| 110 | + |
| 111 | +```python |
| 112 | +@acp.on_message_send |
| 113 | +async def handle(params): |
| 114 | + turn = MyHarnessTurn(params) # implements HarnessTurn |
| 115 | + async for event in emitter.yield_turn(turn): |
| 116 | + yield event |
| 117 | +``` |
| 118 | + |
| 119 | +`yield_turn` forwards each event to the caller and traces spans as a side effect. It is a passthrough when `tracer` is `None`. |
| 120 | + |
| 121 | +### Delivery mode 2: `auto_send_turn` (async/Temporal) |
| 122 | + |
| 123 | +For async or Temporal agents that push to the task stream via Redis: |
| 124 | + |
| 125 | +```python |
| 126 | +result: TurnResult = await emitter.auto_send_turn(turn, created_at=workflow.now()) |
| 127 | +``` |
| 128 | + |
| 129 | +`auto_send_turn` drives `adk.streaming` contexts for every message in the stream, derives and records spans, and returns a `TurnResult` with the final text and usage. Pass `created_at` under Temporal to back-date message timestamps deterministically. |
| 130 | + |
| 131 | +--- |
| 132 | + |
| 133 | +## `TurnResult` |
| 134 | + |
| 135 | +```python |
| 136 | +class TurnResult(BaseModel): |
| 137 | + final_text: str = "" |
| 138 | + usage: TurnUsage = TurnUsage() |
| 139 | +``` |
| 140 | + |
| 141 | +Returned by `auto_send_turn`. `final_text` is the last text segment of the turn (multi-step runs return only the final segment, matching `stream_langgraph_events` / `stream_pydantic_ai_events` semantics). |
| 142 | + |
| 143 | +--- |
| 144 | + |
| 145 | +## Tracing: span derivation |
| 146 | + |
| 147 | +Spans are derived from the canonical stream by `SpanDeriver` (pure, no `adk` dependency) and dispatched to `adk.tracing` by `SpanTracer`. The mapping: |
| 148 | + |
| 149 | +- `StreamTaskMessageStart(ToolRequestContent)` + `StreamTaskMessageDone` on that index -> tool span open (keyed by `tool_call_id`) |
| 150 | +- `StreamTaskMessageFull(ToolResponseContent)` whose `tool_call_id` was opened -> tool span close |
| 151 | +- `StreamTaskMessageFull(ToolRequestContent)` (harnesses that emit tool calls as Full) -> opens a tool span; matching `Full(ToolResponseContent)` closes it |
| 152 | +- `StreamTaskMessageStart(ReasoningContent)` + `StreamTaskMessageDone` -> reasoning span |
| 153 | + |
| 154 | +`SpanTracer` is `SpanDeriver`'s consumer. You can inject a custom `SpanTracer` via `UnifiedEmitter(tracer=<instance>)` for advanced use or testing. |
| 155 | + |
| 156 | +--- |
| 157 | + |
| 158 | +## Usage examples by channel |
| 159 | + |
| 160 | +### Sync ACP (pydantic-ai tap) |
| 161 | + |
| 162 | +```python |
| 163 | +import agentex.lib.adk as adk |
| 164 | +from agentex.lib.adk import UnifiedEmitter, convert_pydantic_ai_to_agentex_events |
| 165 | + |
| 166 | +@acp.on_message_send |
| 167 | +async def handle(params): |
| 168 | + task_id = params.task.id |
| 169 | + async with adk.tracing.span(trace_id=task_id, name="message", ...) as turn_span: |
| 170 | + emitter = UnifiedEmitter( |
| 171 | + task_id=task_id, |
| 172 | + trace_id=task_id, |
| 173 | + parent_span_id=turn_span.id if turn_span else None, |
| 174 | + ) |
| 175 | + tap = convert_pydantic_ai_to_agentex_events(pydantic_stream) |
| 176 | + # wrap tap in a HarnessTurn then yield_turn, or yield directly: |
| 177 | + async for event in tap: |
| 178 | + yield event |
| 179 | +``` |
| 180 | + |
| 181 | +For the pre-unified sync path the tap is still yielded directly; `UnifiedEmitter.yield_turn` is the forward-looking integration point when a `HarnessTurn` wrapper is available. |
| 182 | + |
| 183 | +### Async Temporal (auto-send) |
| 184 | + |
| 185 | +```python |
| 186 | +from agentex.lib.adk import UnifiedEmitter |
| 187 | + |
| 188 | +emitter = UnifiedEmitter( |
| 189 | + task_id=task_id, |
| 190 | + trace_id=task_id, |
| 191 | + parent_span_id=parent_span_id, |
| 192 | +) |
| 193 | +result = await emitter.auto_send_turn(turn, created_at=workflow.now()) |
| 194 | +# result.final_text — last text segment |
| 195 | +# result.usage — TurnUsage (tokens, cost, ...) |
| 196 | +``` |
0 commit comments