Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased

### ⚠ BREAKING CHANGES

* **harness:** removed the deprecated bespoke LangGraph tracing handler `create_langgraph_tracing_handler` (and its `AgentexLangGraphTracingHandler` class) from the public `agentex.lib.adk` surface. Span tracing is now derived from the canonical `StreamTaskMessage*` stream by `UnifiedEmitter` β€” wrap your run in the harness `*Turn` and drive `UnifiedEmitter.yield_turn` / `auto_send_turn`. The `agentex init` templates were migrated accordingly.
* **harness:** each harness now exposes exactly `_<harness>_sync.py` + `_<harness>_turn.py` under `agentex.lib.adk._modules`. The OpenAI harness `OpenAITurn` and `convert_openai_to_agentex_events` moved to `agentex.lib.adk._modules._openai_turn` / `_openai_sync`; back-compat shims remain at `agentex.lib.adk.providers._modules.{openai_turn,sync_provider}` for one release. Public facade names (`stream_pydantic_ai_events`, `stream_langgraph_events`, `emit_langgraph_messages`, etc.) are unchanged.

### Features

* **tracing:** emit OTel metrics for async span queue depth, batch drain, and SGP export success/failure (HTTP status labels). Disable SDK-side recording with ``AGENTEX_TRACING_METRICS=0``.
Expand Down
20 changes: 12 additions & 8 deletions adk/docs/harness.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,17 @@ Every harness tap produces a sequence of these. Everything downstream (delivery,

## Per-harness taps: `convert_<harness>_to_agentex_events`

A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The currently shipped taps are:
A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The shipped taps are:

| Harness | Tap function | Exported from |
|---|---|---|
| pydantic-ai | `convert_pydantic_ai_to_agentex_events` | `agentex.lib.adk` |
| LangGraph | `convert_langgraph_to_agentex_events` | `agentex.lib.adk` |
| claude-code | `convert_claude_code_to_agentex_events` | `agentex.lib.adk` |
| codex | `convert_codex_to_agentex_events` | `agentex.lib.adk` |
| OpenAI Agents | `convert_openai_to_agentex_events` | `agentex.lib.adk.providers._modules.sync_provider` |

Taps for claude-code and codex will be added in subsequent PRs (AGX1-420, AGX1-421) and exported from `agentex.lib.adk` in the same way.
Each harness also provides a `HarnessTurn` wrapper that pairs its tap's event stream with usage extraction: `PydanticAITurn`, `LangGraphTurn`, `ClaudeCodeTurn`, `CodexTurn`, and `OpenAITurn`.

---

Expand Down Expand Up @@ -157,11 +160,13 @@ Spans are derived from the canonical stream by `SpanDeriver` (pure, no `adk` dep

## Usage examples by channel

### Sync ACP (pydantic-ai tap)
### Sync ACP (`yield_turn`)

Build the harness's `HarnessTurn` wrapper and iterate `emitter.yield_turn(turn)` β€” the emitter forwards each event to the caller and traces spans as a side effect:

```python
import agentex.lib.adk as adk
from agentex.lib.adk import UnifiedEmitter, convert_pydantic_ai_to_agentex_events
from agentex.lib.adk import UnifiedEmitter, ClaudeCodeTurn

@acp.on_message_send
async def handle(params):
Expand All @@ -172,13 +177,12 @@ async def handle(params):
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)
tap = convert_pydantic_ai_to_agentex_events(pydantic_stream)
# wrap tap in a HarnessTurn then yield_turn, or yield directly:
async for event in tap:
turn = ClaudeCodeTurn(claude_code_stream) # any HarnessTurn
async for event in emitter.yield_turn(turn):
yield event
```

For the pre-unified sync path the tap is still yielded directly; `UnifiedEmitter.yield_turn` is the forward-looking integration point when a `HarnessTurn` wrapper is available.
Every harness follows the same shape β€” swap `ClaudeCodeTurn` for `PydanticAITurn`, `LangGraphTurn`, `CodexTurn`, or `OpenAITurn` and feed it that harness's native stream.

### Async Temporal (auto-send)

Expand Down
11 changes: 5 additions & 6 deletions src/agentex/lib/adk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from agentex.lib.adk._modules.agents import AgentsModule
from agentex.lib.adk._modules.agent_task_tracker import AgentTaskTrackerModule
from agentex.lib.adk._modules.checkpointer import create_checkpointer
from agentex.lib.adk._modules._langgraph_tracing import create_langgraph_tracing_handler
from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events
from agentex.lib.adk._modules._langgraph_messages import emit_langgraph_messages
from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events
from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn
from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn, stream_langgraph_events
from agentex.lib.adk._modules._langgraph_sync import (
emit_langgraph_messages,
convert_langgraph_to_agentex_events,
)
from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events
from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events
from agentex.lib.adk._modules._pydantic_ai_tracing import create_pydantic_ai_tracing_handler
Expand Down Expand Up @@ -68,7 +68,6 @@
"agent_task_tracker",
# Checkpointing / LangGraph
"create_checkpointer",
"create_langgraph_tracing_handler",
"stream_langgraph_events",
"emit_langgraph_messages",
"convert_langgraph_to_agentex_events",
Expand Down
65 changes: 0 additions & 65 deletions src/agentex/lib/adk/_modules/_langgraph_async.py

This file was deleted.

85 changes: 0 additions & 85 deletions src/agentex/lib/adk/_modules/_langgraph_messages.py

This file was deleted.

83 changes: 81 additions & 2 deletions src/agentex/lib/adk/_modules/_langgraph_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ async def convert_langgraph_to_agentex_events(
Supports both regular models (chunk.content is a str) and reasoning models
like gpt-5/o1/o3 (chunk.content is a list of typed content blocks).

AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` (from
"updates" events), NOT Start+Delta+Done like pydantic-ai. No coalesce_tool_requests
LangGraph emits tool requests as ``StreamTaskMessageFull`` (from "updates"
events), NOT Start+Delta+Done like pydantic-ai. No coalesce_tool_requests
option is needed for LangGraph.

Args:
Expand Down Expand Up @@ -271,3 +271,82 @@ async def convert_langgraph_to_agentex_events(
yield StreamTaskMessageDone(type="done", index=message_index)
if reasoning_streaming:
yield StreamTaskMessageDone(type="done", index=message_index)


async def emit_langgraph_messages(messages: list[Any], task_id: str) -> str:
"""Create Agentex messages for a list of LangGraph messages.

This is the non-streaming counterpart to ``stream_langgraph_events``. Use it
when you run a LangGraph graph with ``ainvoke`` (for example a Temporal-backed
agent using the LangGraph plugin, where streaming deltas aren't available) and
want to surface the resulting messages to the Agentex UI after the fact.

It maps LangGraph/LangChain message objects to Agentex content types:

- ``AIMessage`` tool calls -> ``ToolRequestContent`` (one per call)
- ``AIMessage`` text content -> ``TextContent``
- ``ToolMessage`` -> ``ToolResponseContent``

Pass only the messages produced this turn (e.g. ``messages[already_emitted:]``)
so each message is surfaced exactly once across a multi-turn conversation.

Args:
messages: LangGraph/LangChain message objects to surface β€” typically
the new messages a turn produced.
task_id: The Agentex task to create messages on.

Returns:
The last assistant text emitted (useful as a span/turn output), or "".
"""
# Lazy imports so langchain isn't required at module load time.
from langchain_core.messages import AIMessage, ToolMessage

from agentex.lib import adk
from agentex.types.text_content import TextContent
from agentex.types.tool_request_content import ToolRequestContent
from agentex.types.tool_response_content import ToolResponseContent

final_text = ""
for message in messages:
if isinstance(message, AIMessage):
for tool_call in message.tool_calls or []:
await adk.messages.create(
task_id=task_id,
content=ToolRequestContent(
author="agent",
tool_call_id=tool_call["id"],
name=tool_call["name"],
arguments=tool_call["args"],
),
)
# ``content`` may be a plain string (OpenAI) or a list of content
# blocks (Anthropic/Claude via LangChain, e.g.
# ``[{"type": "text", "text": "..."}]``). Extract and join the text
# so the response is visible regardless of the underlying model.
if isinstance(message.content, str):
text = message.content
else:
text = "".join(
block.get("text", "") if isinstance(block, dict) else str(block)
for block in message.content
if not isinstance(block, dict) or block.get("type") == "text"
)
if text:
final_text = text
await adk.messages.create(
task_id=task_id,
content=TextContent(author="agent", content=text, format="markdown"),
)
elif isinstance(message, ToolMessage):
await adk.messages.create(
task_id=task_id,
content=ToolResponseContent(
author="agent",
tool_call_id=message.tool_call_id,
name=message.name or "unknown",
content=message.content
if isinstance(message.content, str)
else str(message.content),
),
)
return final_text
Loading
Loading