From 5f8a57a6d83ec202cf9b464f48c018cdf7c557b3 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 09:55:30 -0400 Subject: [PATCH 1/2] refactor(harness)!: consolidate the LangGraph harness, remove tracing handler Collapse _langgraph_async / _langgraph_messages / _langgraph_tracing into _langgraph_sync (emit_langgraph_messages, convert helpers) and _langgraph_turn (stream_langgraph_events). Span tracing is now derived from the canonical stream by UnifiedEmitter, so create_langgraph_tracing_handler is removed. Public facade names are unchanged except the removed handler. All in-repo consumers were migrated to the unified surface in the preceding PRs. BREAKING CHANGE: create_langgraph_tracing_handler is removed. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 5 + adk/docs/harness.md | 20 +- src/agentex/lib/adk/__init__.py | 11 +- .../lib/adk/_modules/_langgraph_async.py | 65 ----- .../lib/adk/_modules/_langgraph_messages.py | 85 ------ .../lib/adk/_modules/_langgraph_sync.py | 83 +++++- .../lib/adk/_modules/_langgraph_tracing.py | 273 ------------------ .../lib/adk/_modules/_langgraph_turn.py | 60 +++- src/agentex/lib/core/harness/auto_send.py | 10 +- src/agentex/lib/core/harness/tracer.py | 2 +- tests/lib/adk/test_langgraph_async.py | 2 +- tests/lib/adk/test_langgraph_sync.py | 182 ++++++++++-- tests/lib/adk/test_langgraph_sync_unified.py | 214 -------------- .../harness/test_harness_langgraph_async.py | 38 +-- .../harness/test_harness_langgraph_sync.py | 34 +-- .../test_harness_langgraph_temporal.py | 7 +- 16 files changed, 344 insertions(+), 747 deletions(-) delete mode 100644 src/agentex/lib/adk/_modules/_langgraph_async.py delete mode 100644 src/agentex/lib/adk/_modules/_langgraph_messages.py delete mode 100644 src/agentex/lib/adk/_modules/_langgraph_tracing.py delete mode 100644 tests/lib/adk/test_langgraph_sync_unified.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a9b0590c8..8edf5fb60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +### ⚠ BREAKING CHANGES + +* **harness:** removed the deprecated bespoke tracing handlers `create_langgraph_tracing_handler` / `create_pydantic_ai_tracing_handler` (and their `AgentexLangGraphTracingHandler` / `AgentexPydanticAITracingHandler` classes) from the public `agentex.lib.adk` surface. Span tracing is now derived from the canonical `StreamTaskMessage*` stream by `UnifiedEmitter` — wrap your run in the harness `*Turn` and drive `UnifiedEmitter.yield_turn` / `auto_send_turn`. The `agentex init` templates were migrated accordingly. +* **harness:** each harness now exposes exactly `__sync.py` + `__turn.py` under `agentex.lib.adk._modules`. The OpenAI harness `OpenAITurn` and `convert_openai_to_agentex_events` moved to `agentex.lib.adk._modules._openai_turn` / `_openai_sync`; back-compat shims remain at `agentex.lib.adk.providers._modules.{openai_turn,sync_provider}` for one release. Public facade names (`stream_pydantic_ai_events`, `stream_langgraph_events`, `emit_langgraph_messages`, etc.) are unchanged. + ### Features * **tracing:** emit OTel metrics for async span queue depth, batch drain, and SGP export success/failure (HTTP status labels). Disable SDK-side recording with ``AGENTEX_TRACING_METRICS=0``. diff --git a/adk/docs/harness.md b/adk/docs/harness.md index 6a9d8947a..d81835a03 100644 --- a/adk/docs/harness.md +++ b/adk/docs/harness.md @@ -39,14 +39,17 @@ Every harness tap produces a sequence of these. Everything downstream (delivery, ## Per-harness taps: `convert__to_agentex_events` -A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The currently shipped taps are: +A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The shipped taps are: | Harness | Tap function | Exported from | |---|---|---| | pydantic-ai | `convert_pydantic_ai_to_agentex_events` | `agentex.lib.adk` | | LangGraph | `convert_langgraph_to_agentex_events` | `agentex.lib.adk` | +| claude-code | `convert_claude_code_to_agentex_events` | `agentex.lib.adk` | +| codex | `convert_codex_to_agentex_events` | `agentex.lib.adk` | +| OpenAI Agents | `convert_openai_to_agentex_events` | `agentex.lib.adk.providers._modules.sync_provider` | -Taps for claude-code and codex will be added in subsequent PRs (AGX1-420, AGX1-421) and exported from `agentex.lib.adk` in the same way. +Each harness also provides a `HarnessTurn` wrapper that pairs its tap's event stream with usage extraction: `PydanticAITurn`, `LangGraphTurn`, `ClaudeCodeTurn`, `CodexTurn`, and `OpenAITurn`. --- @@ -157,11 +160,13 @@ Spans are derived from the canonical stream by `SpanDeriver` (pure, no `adk` dep ## Usage examples by channel -### Sync ACP (pydantic-ai tap) +### Sync ACP (`yield_turn`) + +Build the harness's `HarnessTurn` wrapper and iterate `emitter.yield_turn(turn)` — the emitter forwards each event to the caller and traces spans as a side effect: ```python import agentex.lib.adk as adk -from agentex.lib.adk import UnifiedEmitter, convert_pydantic_ai_to_agentex_events +from agentex.lib.adk import UnifiedEmitter, ClaudeCodeTurn @acp.on_message_send async def handle(params): @@ -172,13 +177,12 @@ async def handle(params): trace_id=task_id, parent_span_id=turn_span.id if turn_span else None, ) - tap = convert_pydantic_ai_to_agentex_events(pydantic_stream) - # wrap tap in a HarnessTurn then yield_turn, or yield directly: - async for event in tap: + turn = ClaudeCodeTurn(claude_code_stream) # any HarnessTurn + async for event in emitter.yield_turn(turn): yield event ``` -For the pre-unified sync path the tap is still yielded directly; `UnifiedEmitter.yield_turn` is the forward-looking integration point when a `HarnessTurn` wrapper is available. +Every harness follows the same shape — swap `ClaudeCodeTurn` for `PydanticAITurn`, `LangGraphTurn`, `CodexTurn`, or `OpenAITurn` and feed it that harness's native stream. ### Async Temporal (auto-send) diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index e618a20d3..86fa90253 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -6,11 +6,11 @@ from agentex.lib.adk._modules.agents import AgentsModule from agentex.lib.adk._modules.agent_task_tracker import AgentTaskTrackerModule from agentex.lib.adk._modules.checkpointer import create_checkpointer -from agentex.lib.adk._modules._langgraph_tracing import create_langgraph_tracing_handler -from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events -from agentex.lib.adk._modules._langgraph_messages import emit_langgraph_messages -from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events -from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn, stream_langgraph_events +from agentex.lib.adk._modules._langgraph_sync import ( + emit_langgraph_messages, + convert_langgraph_to_agentex_events, +) from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events from agentex.lib.adk._modules._pydantic_ai_tracing import create_pydantic_ai_tracing_handler @@ -68,7 +68,6 @@ "agent_task_tracker", # Checkpointing / LangGraph "create_checkpointer", - "create_langgraph_tracing_handler", "stream_langgraph_events", "emit_langgraph_messages", "convert_langgraph_to_agentex_events", diff --git a/src/agentex/lib/adk/_modules/_langgraph_async.py b/src/agentex/lib/adk/_modules/_langgraph_async.py deleted file mode 100644 index 02ef059eb..000000000 --- a/src/agentex/lib/adk/_modules/_langgraph_async.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Async LangGraph streaming helper for Agentex. - -Converts LangGraph graph.astream() events into Agentex streaming updates -and pushes them to Redis via adk.streaming contexts. For use with async -ACP agents that stream via Redis rather than HTTP yields. - -Unified surface ---------------- -This module is now implemented on top of ``LangGraphTurn`` and -``UnifiedEmitter.auto_send_turn``, the same surface used by every other -harness adapter (pydantic-ai, openai-agents, etc.). The public signature -and return type are preserved identically. - -AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` events -(from "updates" events), NOT Start+Delta+Done like pydantic-ai. ``auto_send`` -handles Full events correctly; no coalescing wrapper is needed. -""" - -from agentex.lib.utils.temporal import workflow_now_if_in_workflow - - -async def stream_langgraph_events(stream, task_id: str) -> str: - """Stream LangGraph events to Agentex via Redis. - - Processes the stream from graph.astream() called with - stream_mode=["messages", "updates"] and pushes text, reasoning, - tool request, and tool response messages through Redis streaming - contexts. - - Supports both regular models (chunk.content is a str) and reasoning - models like gpt-5/o1/o3 (chunk.content is a list of typed content blocks - in the Responses API responses/v1 format). - - Reimplemented on ``UnifiedEmitter.auto_send_turn(LangGraphTurn(...))`` for - cross-harness consistency. Behavior is identical to the previous bespoke - implementation (verified by characterization tests in test_langgraph_async.py). - - AGX1-377 note: LangGraph emits tool requests as ``Full`` events (from "updates"), - NOT Start+Delta+Done like pydantic-ai. ``auto_send`` handles Full events - correctly; no coalescing wrapper is needed. - - AGX1-378 note: ``created_at`` is set from ``workflow.now()`` when called inside a - Temporal workflow, matching the pattern used by the openai/litellm providers. - Outside a workflow (plain async activities, sync agents) it is ``None`` and the - server's wall clock is used. - - Args: - stream: Async iterator from graph.astream(..., stream_mode=["messages", "updates"]) - task_id: The Agentex task ID to stream messages to. - - Returns: - The accumulated final text output from the agent. - """ - from agentex.lib.core.harness.emitter import UnifiedEmitter - from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn - - # AGX1-377 note: LangGraph emits tool requests as Full events (from "updates"), - # NOT Start+Delta+Done like pydantic-ai. auto_send handles Full events correctly; - # no coalescing wrapper is needed. - # AGX1-378: stamp messages with workflow.now() inside Temporal for deterministic - # created_at ordering; falls back to None (server wall clock) outside a workflow. - turn = LangGraphTurn(stream, model=None) - emitter = UnifiedEmitter(task_id=task_id, trace_id=None, parent_span_id=None) - result = await emitter.auto_send_turn(turn, created_at=workflow_now_if_in_workflow()) - return result.final_text diff --git a/src/agentex/lib/adk/_modules/_langgraph_messages.py b/src/agentex/lib/adk/_modules/_langgraph_messages.py deleted file mode 100644 index c8856755b..000000000 --- a/src/agentex/lib/adk/_modules/_langgraph_messages.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Emit finished LangGraph messages as Agentex task messages. - -This is the non-streaming counterpart to ``stream_langgraph_events``. Use it -when you run a LangGraph graph with ``ainvoke`` (for example a Temporal-backed -agent using the LangGraph plugin, where streaming deltas aren't available) and -want to surface the resulting messages to the Agentex UI after the fact. - -It maps LangGraph/LangChain message objects to Agentex content types: - -- ``AIMessage`` tool calls → ``ToolRequestContent`` (one per call) -- ``AIMessage`` text content → ``TextContent`` -- ``ToolMessage`` → ``ToolResponseContent`` - -Pass only the messages produced this turn (e.g. ``messages[already_emitted:]``) -so each message is surfaced exactly once across a multi-turn conversation. -""" - -from __future__ import annotations - -from typing import Any - - -async def emit_langgraph_messages(messages: list[Any], task_id: str) -> str: - """Create Agentex messages for a list of LangGraph messages. - - Args: - messages: LangGraph/LangChain message objects to surface — typically - the new messages a turn produced. - task_id: The Agentex task to create messages on. - - Returns: - The last assistant text emitted (useful as a span/turn output), or "". - """ - # Lazy imports so langchain isn't required at module load time. - from langchain_core.messages import AIMessage, ToolMessage - - from agentex.lib import adk - from agentex.types.text_content import TextContent - from agentex.types.tool_request_content import ToolRequestContent - from agentex.types.tool_response_content import ToolResponseContent - - final_text = "" - for message in messages: - if isinstance(message, AIMessage): - for tool_call in message.tool_calls or []: - await adk.messages.create( - task_id=task_id, - content=ToolRequestContent( - author="agent", - tool_call_id=tool_call["id"], - name=tool_call["name"], - arguments=tool_call["args"], - ), - ) - # ``content`` may be a plain string (OpenAI) or a list of content - # blocks (Anthropic/Claude via LangChain, e.g. - # ``[{"type": "text", "text": "..."}]``). Extract and join the text - # so the response is visible regardless of the underlying model. - if isinstance(message.content, str): - text = message.content - else: - text = "".join( - block.get("text", "") if isinstance(block, dict) else str(block) - for block in message.content - if not isinstance(block, dict) or block.get("type") == "text" - ) - if text: - final_text = text - await adk.messages.create( - task_id=task_id, - content=TextContent(author="agent", content=text, format="markdown"), - ) - elif isinstance(message, ToolMessage): - await adk.messages.create( - task_id=task_id, - content=ToolResponseContent( - author="agent", - tool_call_id=message.tool_call_id, - name=message.name or "unknown", - content=message.content - if isinstance(message.content, str) - else str(message.content), - ), - ) - return final_text diff --git a/src/agentex/lib/adk/_modules/_langgraph_sync.py b/src/agentex/lib/adk/_modules/_langgraph_sync.py index 48231a87d..9d7b73847 100644 --- a/src/agentex/lib/adk/_modules/_langgraph_sync.py +++ b/src/agentex/lib/adk/_modules/_langgraph_sync.py @@ -48,8 +48,8 @@ async def convert_langgraph_to_agentex_events( Supports both regular models (chunk.content is a str) and reasoning models like gpt-5/o1/o3 (chunk.content is a list of typed content blocks). - AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` (from - "updates" events), NOT Start+Delta+Done like pydantic-ai. No coalesce_tool_requests + LangGraph emits tool requests as ``StreamTaskMessageFull`` (from "updates" + events), NOT Start+Delta+Done like pydantic-ai. No coalesce_tool_requests option is needed for LangGraph. Args: @@ -271,3 +271,82 @@ async def convert_langgraph_to_agentex_events( yield StreamTaskMessageDone(type="done", index=message_index) if reasoning_streaming: yield StreamTaskMessageDone(type="done", index=message_index) + + +async def emit_langgraph_messages(messages: list[Any], task_id: str) -> str: + """Create Agentex messages for a list of LangGraph messages. + + This is the non-streaming counterpart to ``stream_langgraph_events``. Use it + when you run a LangGraph graph with ``ainvoke`` (for example a Temporal-backed + agent using the LangGraph plugin, where streaming deltas aren't available) and + want to surface the resulting messages to the Agentex UI after the fact. + + It maps LangGraph/LangChain message objects to Agentex content types: + + - ``AIMessage`` tool calls -> ``ToolRequestContent`` (one per call) + - ``AIMessage`` text content -> ``TextContent`` + - ``ToolMessage`` -> ``ToolResponseContent`` + + Pass only the messages produced this turn (e.g. ``messages[already_emitted:]``) + so each message is surfaced exactly once across a multi-turn conversation. + + Args: + messages: LangGraph/LangChain message objects to surface — typically + the new messages a turn produced. + task_id: The Agentex task to create messages on. + + Returns: + The last assistant text emitted (useful as a span/turn output), or "". + """ + # Lazy imports so langchain isn't required at module load time. + from langchain_core.messages import AIMessage, ToolMessage + + from agentex.lib import adk + from agentex.types.text_content import TextContent + from agentex.types.tool_request_content import ToolRequestContent + from agentex.types.tool_response_content import ToolResponseContent + + final_text = "" + for message in messages: + if isinstance(message, AIMessage): + for tool_call in message.tool_calls or []: + await adk.messages.create( + task_id=task_id, + content=ToolRequestContent( + author="agent", + tool_call_id=tool_call["id"], + name=tool_call["name"], + arguments=tool_call["args"], + ), + ) + # ``content`` may be a plain string (OpenAI) or a list of content + # blocks (Anthropic/Claude via LangChain, e.g. + # ``[{"type": "text", "text": "..."}]``). Extract and join the text + # so the response is visible regardless of the underlying model. + if isinstance(message.content, str): + text = message.content + else: + text = "".join( + block.get("text", "") if isinstance(block, dict) else str(block) + for block in message.content + if not isinstance(block, dict) or block.get("type") == "text" + ) + if text: + final_text = text + await adk.messages.create( + task_id=task_id, + content=TextContent(author="agent", content=text, format="markdown"), + ) + elif isinstance(message, ToolMessage): + await adk.messages.create( + task_id=task_id, + content=ToolResponseContent( + author="agent", + tool_call_id=message.tool_call_id, + name=message.name or "unknown", + content=message.content + if isinstance(message.content, str) + else str(message.content), + ), + ) + return final_text diff --git a/src/agentex/lib/adk/_modules/_langgraph_tracing.py b/src/agentex/lib/adk/_modules/_langgraph_tracing.py deleted file mode 100644 index 2162201e1..000000000 --- a/src/agentex/lib/adk/_modules/_langgraph_tracing.py +++ /dev/null @@ -1,273 +0,0 @@ -"""LangChain callback handler that creates Agentex spans for LLM calls and tool executions. - -.. deprecated:: - ``AgentexLangGraphTracingHandler`` and ``create_langgraph_tracing_handler`` are - superseded by the unified harness surface (``LangGraphTurn`` + - ``UnifiedEmitter``), which derives spans automatically from the canonical - event stream without requiring a LangChain callback handler. - - They remain importable and functional for backward compatibility, but new - agents should use the unified path instead. -""" -# ruff: noqa: ARG002 -# Callback methods must accept all arguments defined by LangChain's AsyncCallbackHandler interface. - -from __future__ import annotations - -from uuid import UUID -from typing import Any, override - -from langchain_core.outputs import LLMResult -from langchain_core.messages import BaseMessage -from langchain_core.callbacks import AsyncCallbackHandler - -from agentex.types.span import Span -from agentex.lib.utils.logging import make_logger -from agentex.lib.adk._modules.tracing import TracingModule - -logger = make_logger(__name__) - - -class AgentexLangGraphTracingHandler(AsyncCallbackHandler): - """Async LangChain callback handler that records Agentex tracing spans. - - Creates child spans under a parent span for each LLM call and tool execution. - Designed to be passed via ``config={"callbacks": [handler]}`` to LangGraph's - ``graph.astream()`` or ``graph.ainvoke()``. - - Span hierarchy produced:: - - (e.g. "message" turn-level span) - ├── llm: (LLM call) - ├── tool: (tool execution) - └── llm: (LLM call) - - .. deprecated:: - Use ``LangGraphTurn`` with ``UnifiedEmitter`` instead. The unified - harness derives equivalent spans from the canonical event stream, - removing the need for a LangChain callback handler entirely. - """ - - def __init__( - self, - trace_id: str, - parent_span_id: str | None = None, - tracing: TracingModule | None = None, - ) -> None: - super().__init__() - self._trace_id = trace_id - self._parent_span_id = parent_span_id - # Lazily initialise TracingModule so the httpx client is created - # inside the *running* event-loop (not at import/construction time). - self._tracing_eager = tracing - self._tracing_lazy: TracingModule | None = None - # Map run_id → Span for in-flight spans - self._spans: dict[UUID, Span] = {} - - @property - def _tracing(self) -> TracingModule: - if self._tracing_eager is not None: - return self._tracing_eager - if self._tracing_lazy is None: - self._tracing_lazy = TracingModule() - return self._tracing_lazy - - # ------------------------------------------------------------------ - # LLM lifecycle - # ------------------------------------------------------------------ - - @override - async def on_chat_model_start( - self, - serialized: dict[str, Any], - messages: list[list[BaseMessage]], - *, - run_id: UUID, - parent_run_id: UUID | None = None, - tags: list[str] | None = None, - metadata: dict[str, Any] | None = None, - **kwargs: Any, - ) -> None: - model_name = (metadata or {}).get("ls_model_name", "") or _extract_model_name(serialized) - span = await self._tracing.start_span( - trace_id=self._trace_id, - name=f"llm:{model_name}" if model_name else "llm", - input=_serialize_messages(messages), - parent_id=self._parent_span_id, - data={"__span_type__": "COMPLETION"}, - ) - if span: - self._spans[run_id] = span - - @override - async def on_llm_end( - self, - response: LLMResult, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - **kwargs: Any, - ) -> None: - span = self._spans.pop(run_id, None) - if span is None: - return - span.output = _serialize_llm_result(response) - await self._tracing.end_span(trace_id=self._trace_id, span=span) - - @override - async def on_llm_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - **kwargs: Any, - ) -> None: - span = self._spans.pop(run_id, None) - if span is None: - return - span.output = {"error": str(error)} - await self._tracing.end_span(trace_id=self._trace_id, span=span) - - # ------------------------------------------------------------------ - # Tool lifecycle - # ------------------------------------------------------------------ - - @override - async def on_tool_start( - self, - serialized: dict[str, Any], - input_str: str, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - tags: list[str] | None = None, - metadata: dict[str, Any] | None = None, - inputs: dict[str, Any] | None = None, - **kwargs: Any, - ) -> None: - tool_name = serialized.get("name", "") or serialized.get("id", [""])[-1] - span = await self._tracing.start_span( - trace_id=self._trace_id, - name=f"tool:{tool_name}" if tool_name else "tool", - input={"input": input_str}, - parent_id=self._parent_span_id, - data={"__span_type__": "CUSTOM"}, - ) - if span: - self._spans[run_id] = span - - @override - async def on_tool_end( - self, - output: str, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - **kwargs: Any, - ) -> None: - span = self._spans.pop(run_id, None) - if span is None: - return - span.output = {"output": output} - await self._tracing.end_span(trace_id=self._trace_id, span=span) - - @override - async def on_tool_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - **kwargs: Any, - ) -> None: - span = self._spans.pop(run_id, None) - if span is None: - return - span.output = {"error": str(error)} - await self._tracing.end_span(trace_id=self._trace_id, span=span) - - -# ------------------------------------------------------------------ -# Helpers -# ------------------------------------------------------------------ - - -def _extract_model_name(serialized: dict[str, Any]) -> str: - """Best-effort model name extraction from the serialized callback dict.""" - kwargs = serialized.get("kwargs", {}) - return kwargs.get("model_name", "") or kwargs.get("model", "") - - -def _serialize_messages(messages: list[list[BaseMessage]]) -> dict[str, Any]: - """Serialize LangChain messages into a JSON-safe dict for the span input.""" - result: list[dict[str, Any]] = [] - for batch in messages: - for msg in batch: - entry: dict[str, Any] = {"type": msg.type, "content": msg.content} - tool_calls = getattr(msg, "tool_calls", None) - if tool_calls: - entry["tool_calls"] = tool_calls - result.append(entry) - return {"messages": result} - - -def _serialize_llm_result(response: LLMResult) -> dict[str, Any]: - """Serialize an LLMResult into a JSON-safe dict for the span output.""" - output: dict[str, Any] = {} - if response.generations: - last_gen = response.generations[-1] - if last_gen: - gen = last_gen[-1] - msg = getattr(gen, "message", None) - - # For reasoning models, content is a list of typed blocks. - # Extract text from the blocks instead of relying on gen.text. - if msg and isinstance(msg.content, list): - text_parts: list[str] = [] - for block in msg.content: - if isinstance(block, dict): - if block.get("type") == "text": - text_parts.append(block.get("text", "")) - output["content"] = "".join(text_parts) if text_parts else gen.text - else: - output["content"] = gen.text - - if msg and hasattr(msg, "tool_calls") and msg.tool_calls: - output["tool_calls"] = [{"name": tc["name"], "args": tc["args"]} for tc in msg.tool_calls] - return output - - -def create_langgraph_tracing_handler( - trace_id: str, - parent_span_id: str | None = None, -) -> AgentexLangGraphTracingHandler: - """Create a LangChain callback handler that records Agentex tracing spans. - - Pass the returned handler to LangGraph via ``config={"callbacks": [handler]}``. - - Args: - trace_id: The trace ID (typically the task/thread ID). - parent_span_id: Optional parent span ID to nest LLM/tool spans under. - - Returns: - An ``AgentexLangGraphTracingHandler`` instance ready to use as a LangChain callback. - - .. deprecated:: - Use ``LangGraphTurn`` with ``UnifiedEmitter`` instead. The unified harness - derives equivalent spans from the canonical event stream automatically, with - no LangChain callback required:: - - from agentex.lib.core.harness.emitter import UnifiedEmitter - from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn - - turn = LangGraphTurn(stream) - emitter = UnifiedEmitter(task_id=task_id, trace_id=trace_id, parent_span_id=span_id) - result = await emitter.auto_send_turn(turn) - - This function remains available for backward compatibility. - """ - return AgentexLangGraphTracingHandler( - trace_id=trace_id, - parent_span_id=parent_span_id, - ) diff --git a/src/agentex/lib/adk/_modules/_langgraph_turn.py b/src/agentex/lib/adk/_modules/_langgraph_turn.py index da8ff0e7c..a6e290e1b 100644 --- a/src/agentex/lib/adk/_modules/_langgraph_turn.py +++ b/src/agentex/lib/adk/_modules/_langgraph_turn.py @@ -4,9 +4,9 @@ ``langgraph_usage_to_turn_usage`` helper that maps LangGraph's ``AIMessage.usage_metadata`` onto the framework-agnostic ``TurnUsage`` model. -AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` events -(from "updates" events), NOT Start+Delta+Done like pydantic-ai. ``auto_send`` -handles Full events correctly; no coalescing wrapper is needed. +LangGraph emits tool requests as ``StreamTaskMessageFull`` events (from +"updates" events), NOT Start+Delta+Done like pydantic-ai. ``auto_send`` handles +Full events correctly; no coalescing wrapper is needed. """ from __future__ import annotations @@ -14,6 +14,7 @@ from typing import Any, AsyncIterator from collections.abc import AsyncGenerator +from agentex.lib.utils.temporal import workflow_now_if_in_workflow from agentex.lib.core.harness.types import TurnUsage, StreamTaskMessage from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events @@ -111,9 +112,9 @@ class LangGraphTurn: # Async / temporal result = await emitter.auto_send_turn(turn) - AGX1-377 note: LangGraph tool requests are ``StreamTaskMessageFull`` (from - "updates"), NOT Start+Delta+Done like pydantic-ai. No ``coalesce_tool_requests`` - option is needed. + LangGraph tool requests are ``StreamTaskMessageFull`` (from "updates"), NOT + Start+Delta+Done like pydantic-ai. No ``coalesce_tool_requests`` option is + needed. Usage data is captured lazily via the ``on_final_ai_message`` callback and is only valid after ``events`` has been fully consumed. Multi-step turns @@ -150,3 +151,50 @@ def usage(self) -> TurnUsage: did not report usage. """ return self._usage + + +async def stream_langgraph_events(stream, task_id: str) -> str: + """Stream LangGraph events to Agentex via Redis. + + Converts LangGraph ``graph.astream()`` events into Agentex streaming + updates and pushes them to Redis via ``adk.streaming`` contexts. For use + with async ACP agents that stream via Redis rather than HTTP yields. + + Processes the stream from graph.astream() called with + stream_mode=["messages", "updates"] and pushes text, reasoning, + tool request, and tool response messages through Redis streaming + contexts. + + Supports both regular models (chunk.content is a str) and reasoning + models like gpt-5/o1/o3 (chunk.content is a list of typed content blocks + in the Responses API responses/v1 format). + + Implemented on ``UnifiedEmitter.auto_send_turn(LangGraphTurn(...))`` for + cross-harness consistency, the same surface used by every other harness + adapter (pydantic-ai, openai-agents, etc.). The public signature and + return type are preserved identically. + + LangGraph emits tool requests as ``Full`` events (from "updates"), NOT + Start+Delta+Done like pydantic-ai. ``auto_send`` handles Full events + correctly; no coalescing wrapper is needed. + + ``created_at`` is set from ``workflow.now()`` when called inside a + Temporal workflow, matching the pattern used by the openai/litellm providers. + Outside a workflow (plain async activities, sync agents) it is ``None`` and the + server's wall clock is used. + + Args: + stream: Async iterator from graph.astream(..., stream_mode=["messages", "updates"]) + task_id: The Agentex task ID to stream messages to. + + Returns: + The accumulated final text output from the agent. + """ + from agentex.lib.core.harness.emitter import UnifiedEmitter + + # Stamp messages with workflow.now() inside Temporal for deterministic + # created_at ordering; falls back to None (server wall clock) outside a workflow. + turn = LangGraphTurn(stream, model=None) + emitter = UnifiedEmitter(task_id=task_id, trace_id=None, parent_span_id=None) + result = await emitter.auto_send_turn(turn, created_at=workflow_now_if_in_workflow()) + return result.final_text diff --git a/src/agentex/lib/core/harness/auto_send.py b/src/agentex/lib/core/harness/auto_send.py index 2ecd6b583..b645a4aae 100644 --- a/src/agentex/lib/core/harness/auto_send.py +++ b/src/agentex/lib/core/harness/auto_send.py @@ -52,11 +52,11 @@ async def auto_send( final_text_parts so that multi-step turns return the LAST text segment. Full(TextContent) also overwrites final_text_parts (same semantics). - AGX1-378: created_at is forwarded to every streaming_task_message_context - call so callers can back-date message timestamps. + created_at is forwarded to every streaming_task_message_context call so + callers can back-date message timestamps. Mirrors the open/close/stream_update pattern from - src/agentex/lib/adk/_modules/_langgraph_async.py: + src/agentex/lib/adk/_modules/_langgraph_turn.py: - context opened via streaming_task_message_context(...).__aenter__() - context closed via ctx.close() (not __aexit__) - deltas pushed as StreamTaskMessageDelta with parent_task_message set @@ -110,8 +110,8 @@ async def _close_all() -> None: ctx = ctx_map.get(event.index) if ctx is not None and event.delta is not None: # Reconstruct the delta with parent_task_message set from - # the context's task_message (mirrors _langgraph_async.py - # lines 72-78 and 117-127). + # the context's task_message (mirrors the legacy + # _langgraph_async streaming helper, now in _langgraph_turn.py). delta_with_parent = StreamTaskMessageDelta( parent_task_message=ctx.task_message, delta=event.delta, diff --git a/src/agentex/lib/core/harness/tracer.py b/src/agentex/lib/core/harness/tracer.py index 4ca4d628b..0c6167b76 100644 --- a/src/agentex/lib/core/harness/tracer.py +++ b/src/agentex/lib/core/harness/tracer.py @@ -24,7 +24,7 @@ class SpanTracer: The real TracingModule.end_span does NOT accept an output kwarg — output is recorded by mutating span.output before calling end_span, matching the pattern - used throughout the codebase (see _langgraph_tracing.py on_tool_end etc.). + used throughout the codebase. Span-lifecycle contract: the `_open` dict (span key -> span object) is scoped to a single turn. Pairing is by `key`: diff --git a/tests/lib/adk/test_langgraph_async.py b/tests/lib/adk/test_langgraph_async.py index 682bd43bc..ebe215a15 100644 --- a/tests/lib/adk/test_langgraph_async.py +++ b/tests/lib/adk/test_langgraph_async.py @@ -26,7 +26,7 @@ from agentex.types.text_content import TextContent from agentex.types.task_message_delta import TextDelta from agentex.types.task_message_update import StreamTaskMessageDelta -from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events +from agentex.lib.adk._modules._langgraph_turn import stream_langgraph_events TASK_ID = "task-test" diff --git a/tests/lib/adk/test_langgraph_sync.py b/tests/lib/adk/test_langgraph_sync.py index 248d18f68..9e8c6e4f0 100644 --- a/tests/lib/adk/test_langgraph_sync.py +++ b/tests/lib/adk/test_langgraph_sync.py @@ -1,10 +1,12 @@ -"""Tests for the sync LangGraph -> Agentex stream event converter. +"""Tests for the sync LangGraph -> Agentex path. Covers: -- Basic text, tool call, and tool response emission -- on_final_ai_message callback for usage capture -- create_langgraph_tracing_handler symbol is importable and functional - (runtime DeprecationWarning removed; deprecation is docstring-only) +- The bare converter ``convert_langgraph_to_agentex_events``: + * Basic text, tool call, and tool response emission + * on_final_ai_message callback for usage capture +- The unified sync (HTTP ACP) path ``UnifiedEmitter.yield_turn(LangGraphTurn(...))``: + * Passthrough: yield_turn events equal LangGraphTurn(stream).events + * Span derivation from Full tool events with a fake tracer NOTE: langchain_core imports must be deferred to test-function scope because conftest.py stubs out ``langchain_core.messages`` with MagicMock for ADK @@ -15,15 +17,20 @@ import sys from typing import Any, AsyncIterator +from datetime import datetime, timezone +from dataclasses import field, dataclass import pytest +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter from agentex.types.task_message_update import ( StreamTaskMessageFull, ) from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn # --------------------------------------------------------------------------- # Helpers @@ -227,21 +234,160 @@ def _cb(msg): assert yield_order.index("event") < yield_order.index("callback") -class TestLangGraphTracingHandlerBackwardCompat: - def test_create_langgraph_tracing_handler_no_runtime_warning(self): - """Deprecated symbol remains importable and emits no runtime DeprecationWarning. +# --------------------------------------------------------------------------- +# Unified sync path: LangGraphTurn + UnifiedEmitter.yield_turn +# +# Verifies the sync (HTTP ACP) delivery surface: +# 1. Passthrough: events from emitter.yield_turn(LangGraphTurn(stream)) equal +# LangGraphTurn(stream).events collected directly. +# 2. Span derivation: with trace_id + fake tracer, tool spans are derived from +# the event stream. +# --------------------------------------------------------------------------- + + +@dataclass +class _FakeTracingBackend: + spans_started: list[dict[str, Any]] = field(default_factory=list) + spans_ended: list[str] = field(default_factory=list) + + async def start_span(self, **kw) -> Any: + from agentex.types.span import Span + + sp = Span( + id=f"span-{len(self.spans_started) + 1}", + trace_id=kw.get("trace_id", "trace1"), + name=kw.get("name", ""), + start_time=datetime.now(tz=timezone.utc), + ) + self.spans_started.append(kw) + return sp + + async def end_span(self, *, trace_id: str, span: Any) -> None: + self.spans_ended.append(span.id if span else "") + + +class TestUnifiedSyncPathPassthrough: + async def test_yield_turn_events_equal_direct_events(self): + """Events from emitter.yield_turn(LangGraphTurn(stream)) must equal + LangGraphTurn(stream).events collected directly — the emitter must not + add, drop, or reorder events in yield mode.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="Hello!") + ai_msg = AIMessage(content="Hello!") - The runtime warnings.warn was removed (docstring-only deprecation) to - align with PR 4/6 and avoid breaking callers under warnings-as-errors. - Using ``warnings.simplefilter("error", DeprecationWarning)`` verifies - that calling the function is safe under -W error conditions. + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + + direct = [e async for e in LangGraphTurn(_make_stream(events_raw)).events] + + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + via_emitter = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert len(direct) == len(via_emitter), "yield_turn must not add or drop events relative to direct iteration" + for a, b in zip(direct, via_emitter, strict=True): + assert type(a) == type(b), f"Event type mismatch: {type(a).__name__} vs {type(b).__name__}" + + async def test_yield_turn_passes_all_event_types(self): + """Start, Delta, Done, Full — each type is preserved.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="hi") + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="hi", tool_calls=[tc]) + + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + types = {type(e).__name__ for e in out} + # text chunk emits Start + Delta + assert "StreamTaskMessageStart" in types + assert "StreamTaskMessageDelta" in types + # tool call emits Full + assert "StreamTaskMessageFull" in types + + async def test_empty_stream_yields_no_events(self): + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream([])))] + assert out == [] + + +class TestUnifiedSyncPathSpanDerivation: + @pytest.fixture + def fake_tracer(self): + backend = _FakeTracingBackend() + tracer = SpanTracer( + trace_id="trace1", + parent_span_id=None, + task_id="t", + tracing=backend, # type: ignore[arg-type] + ) + return tracer, backend + + async def test_tool_span_derived_from_full_events(self, fake_tracer): + """SpanDeriver handles Full tool events for LangGraph. + + Full(ToolRequestContent) opens a tool span keyed by tool_call_id; + Full(ToolResponseContent) closes it, aligning LangGraph's Full-event + path with the Start+Done harnesses (pydantic-ai, openai-agents). """ - import warnings + from langchain_core.messages import AIMessage, ToolMessage - from agentex.lib.adk._modules._langgraph_tracing import create_langgraph_tracing_handler + tracer, backend = fake_tracer + tc = {"id": "c1", "name": "get_weather", "args": {"city": "Paris"}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="Sunny", tool_call_id="c1", name="get_weather") + + events_raw = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("error", DeprecationWarning) - create_langgraph_tracing_handler(trace_id="t1", parent_span_id="p1") + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert len(backend.spans_started) == 1, "Full(ToolRequestContent) opens one tool span" + started = backend.spans_started[0] + assert started["name"] == "get_weather" + assert started["input"] == {"city": "Paris"} + + async def test_no_spans_when_no_tool_calls(self, fake_tracer): + """yield_turn with tracer but no tool calls emits no spans.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + tracer, backend = fake_tracer + chunk = AIMessageChunk(content="Hello!") + ai_msg = AIMessage(content="Hello!") + + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert backend.spans_started == [], "No tool spans when there are no tool calls" + + async def test_tracer_none_means_no_spans(self): + """With tracer=False, no spans should be emitted.""" + from langchain_core.messages import AIMessage, ToolMessage + + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="ok", tool_call_id="c1", name="t") + + events_raw = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] - assert w == [], "create_langgraph_tracing_handler must NOT emit a runtime DeprecationWarning" + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=False) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + # No assertion on spans since tracer=False means emitter.tracer is None + assert emitter.tracer is None diff --git a/tests/lib/adk/test_langgraph_sync_unified.py b/tests/lib/adk/test_langgraph_sync_unified.py deleted file mode 100644 index cfd522828..000000000 --- a/tests/lib/adk/test_langgraph_sync_unified.py +++ /dev/null @@ -1,214 +0,0 @@ -"""Unified sync path tests for LangGraphTurn + UnifiedEmitter. - -Verifies: -1. Passthrough: events from emitter.yield_turn(LangGraphTurn(stream)) equal - LangGraphTurn(stream).events collected directly. -2. Span derivation: with trace_id + fake tracer, tool spans are derived from - the event stream. - -NOTE: langchain_core imports are deferred to test scope because conftest.py -stubs ``langchain_core.messages`` with MagicMock. -""" - -from __future__ import annotations - -import sys -from typing import Any -from datetime import datetime, timezone -from dataclasses import field, dataclass - -import pytest - -from agentex.lib.core.harness.tracer import SpanTracer -from agentex.lib.core.harness.emitter import UnifiedEmitter -from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn - -# --------------------------------------------------------------------------- -# Remove conftest stubs so real langchain_core types are used -# --------------------------------------------------------------------------- - - -@pytest.fixture(autouse=True) -def _real_langchain_core(): - stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] - saved = {k: sys.modules.pop(k) for k in stub_keys} - import importlib - - importlib.import_module("langchain_core.messages") - yield - sys.modules.update(saved) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _make_stream(events: list[tuple[str, Any]]): - async def _gen(): - for e in events: - yield e - - return _gen() - - -# --------------------------------------------------------------------------- -# Fake SpanTracer -# --------------------------------------------------------------------------- - - -@dataclass -class _FakeTracingBackend: - spans_started: list[dict[str, Any]] = field(default_factory=list) - spans_ended: list[str] = field(default_factory=list) - - async def start_span(self, **kw) -> Any: - from agentex.types.span import Span - - sp = Span( - id=f"span-{len(self.spans_started) + 1}", - trace_id=kw.get("trace_id", "trace1"), - name=kw.get("name", ""), - start_time=datetime.now(tz=timezone.utc), - ) - self.spans_started.append(kw) - return sp - - async def end_span(self, *, trace_id: str, span: Any) -> None: - self.spans_ended.append(span.id if span else "") - - -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- - - -class TestPassthrough: - async def test_yield_turn_events_equal_direct_events(self): - """Events from emitter.yield_turn(LangGraphTurn(stream)) must equal - LangGraphTurn(stream).events collected directly — the emitter must not - add, drop, or reorder events in yield mode.""" - from langchain_core.messages import AIMessage, AIMessageChunk - - chunk = AIMessageChunk(content="Hello!") - ai_msg = AIMessage(content="Hello!") - - # Build two identical streams - events_raw = [ - ("messages", (chunk, {})), - ("updates", {"agent": {"messages": [ai_msg]}}), - ] - - # Direct collection - direct = [e async for e in LangGraphTurn(_make_stream(events_raw)).events] - - # Via emitter.yield_turn - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) - via_emitter = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - - assert len(direct) == len(via_emitter), "yield_turn must not add or drop events relative to direct iteration" - for a, b in zip(direct, via_emitter, strict=True): - assert type(a) == type(b), f"Event type mismatch: {type(a).__name__} vs {type(b).__name__}" - - async def test_yield_turn_passes_all_event_types(self): - """Start, Delta, Done, Full — each type is preserved.""" - from langchain_core.messages import AIMessage, AIMessageChunk - - chunk = AIMessageChunk(content="hi") - tc = {"id": "c1", "name": "t", "args": {}} - ai_msg = AIMessage(content="hi", tool_calls=[tc]) - - events_raw = [ - ("messages", (chunk, {})), - ("updates", {"agent": {"messages": [ai_msg]}}), - ] - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) - out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - types = {type(e).__name__ for e in out} - # text chunk emits Start + Delta - assert "StreamTaskMessageStart" in types - assert "StreamTaskMessageDelta" in types - # tool call emits Full - assert "StreamTaskMessageFull" in types - - async def test_empty_stream_yields_no_events(self): - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) - out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream([])))] - assert out == [] - - -class TestSpanDerivation: - @pytest.fixture - def fake_tracer(self): - backend = _FakeTracingBackend() - tracer = SpanTracer( - trace_id="trace1", - parent_span_id=None, - task_id="t", - tracing=backend, # type: ignore[arg-type] - ) - return tracer, backend - - async def test_tool_span_derived_from_full_events(self, fake_tracer): - """AGX1-377: SpanDeriver now handles Full tool events for LangGraph. - - Full(ToolRequestContent) opens a tool span keyed by tool_call_id; - Full(ToolResponseContent) closes it. This bridges the previous gap where - LangGraph's Full-event path produced no spans, aligning it with - Start+Done harnesses (pydantic-ai, openai-agents). - """ - from langchain_core.messages import AIMessage, ToolMessage - - tracer, backend = fake_tracer - tc = {"id": "c1", "name": "get_weather", "args": {"city": "Paris"}} - ai_msg = AIMessage(content="", tool_calls=[tc]) - tool_msg = ToolMessage(content="Sunny", tool_call_id="c1", name="get_weather") - - events_raw = [ - ("updates", {"agent": {"messages": [ai_msg]}}), - ("updates", {"tools": {"messages": [tool_msg]}}), - ] - - emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) - _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - - assert len(backend.spans_started) == 1, "Full(ToolRequestContent) opens one tool span" - started = backend.spans_started[0] - assert started["name"] == "get_weather" - assert started["input"] == {"city": "Paris"} - - async def test_no_spans_when_no_tool_calls(self, fake_tracer): - """yield_turn with tracer but no tool calls emits no spans.""" - from langchain_core.messages import AIMessage, AIMessageChunk - - tracer, backend = fake_tracer - chunk = AIMessageChunk(content="Hello!") - ai_msg = AIMessage(content="Hello!") - - events_raw = [ - ("messages", (chunk, {})), - ("updates", {"agent": {"messages": [ai_msg]}}), - ] - - emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) - _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - - assert backend.spans_started == [], "No tool spans when there are no tool calls" - - async def test_tracer_none_means_no_spans(self): - """With tracer=False, no spans should be emitted.""" - from langchain_core.messages import AIMessage, ToolMessage - - tc = {"id": "c1", "name": "t", "args": {}} - ai_msg = AIMessage(content="", tool_calls=[tc]) - tool_msg = ToolMessage(content="ok", tool_call_id="c1", name="t") - - events_raw = [ - ("updates", {"agent": {"messages": [ai_msg]}}), - ("updates", {"tools": {"messages": [tool_msg]}}), - ] - - emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=False) - _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - # No assertion on spans since tracer=False means emitter.tracer is None - assert emitter.tracer is None diff --git a/tests/lib/core/harness/test_harness_langgraph_async.py b/tests/lib/core/harness/test_harness_langgraph_async.py index 39bf5bc66..09e92102b 100644 --- a/tests/lib/core/harness/test_harness_langgraph_async.py +++ b/tests/lib/core/harness/test_harness_langgraph_async.py @@ -13,10 +13,10 @@ -------------- - The async handler pushes the correct sequence of messages to the fake streaming backend: Full(ToolRequest) + Full(ToolResponse) + text Start/Delta/Done. -- final_text accumulates all text (not just last segment — AGX1-377 unified behavior). +- final_text accumulates all text (not just last segment — unified behavior). - Tool messages go through streaming_task_message_context (not messages.create). -- With a SpanTracer, no tool spans are produced (AGX1-377: Full events are not - handled by SpanDeriver today). +- With a SpanTracer, Full tool events produce tool spans (request opens, response + closes), aligning LangGraph tracing with the Start+Done harnesses. What is NOT covered without live infrastructure ----------------------------------------------- @@ -46,6 +46,8 @@ from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from ._fakes import FakeTracing + # --------------------------------------------------------------------------- # Remove conftest stubs so real langchain_core types are used # --------------------------------------------------------------------------- @@ -102,30 +104,6 @@ def streaming_task_message_context(self, task_id: str, initial_content: Any, **k return ctx -# --------------------------------------------------------------------------- -# Fake tracing backend -# --------------------------------------------------------------------------- - - -class _FakeSpan: - def __init__(self, name: str) -> None: - self.name = name - self.output: Any = None - - -class _FakeTracing: - def __init__(self) -> None: - self.started: list[tuple[str, Any]] = [] - self.ended: list[tuple[str, Any]] = [] - - async def start_span(self, *, trace_id: str, name: str, **kw: Any) -> _FakeSpan: - self.started.append((name, kw.get("parent_id"))) - return _FakeSpan(name) - - async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None: - self.ended.append((span.name, span.output)) - - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -142,9 +120,9 @@ async def _gen(): async def _run_auto_send_turn( stream_events: list[tuple[str, Any]], trace_id: str | None = None, -) -> tuple[TurnResult, _FakeStreaming, _FakeTracing | None]: +) -> tuple[TurnResult, _FakeStreaming, FakeTracing | None]: fake_streaming = _FakeStreaming() - fake_tracing = _FakeTracing() if trace_id else None + fake_tracing = FakeTracing() if trace_id else None tracer: SpanTracer | bool = False if trace_id and fake_tracing is not None: @@ -275,7 +253,7 @@ async def test_turn_usage_populated_after_events_consumed(self): assert usage.total_tokens == 15 async def test_tracer_produces_tool_spans_for_full_events(self): - """AGX1-377: SpanDeriver now handles Full tool events (request opens, response closes). + """SpanDeriver handles Full tool events (request opens, response closes). Full(ToolRequestContent) opens a tool span; Full(ToolResponseContent) closes it. This aligns LangGraph tracing with Start+Done harnesses (pydantic-ai, openai-agents). diff --git a/tests/lib/core/harness/test_harness_langgraph_sync.py b/tests/lib/core/harness/test_harness_langgraph_sync.py index 9f67dd2b6..67d213b6a 100644 --- a/tests/lib/core/harness/test_harness_langgraph_sync.py +++ b/tests/lib/core/harness/test_harness_langgraph_sync.py @@ -46,6 +46,8 @@ from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from ._fakes import FakeTracing + # --------------------------------------------------------------------------- # Remove conftest stubs so real langchain_core types are used # --------------------------------------------------------------------------- @@ -62,32 +64,6 @@ def _real_langchain_core(): sys.modules.update(saved) -# --------------------------------------------------------------------------- -# Fake tracing backend -# --------------------------------------------------------------------------- - - -class _FakeSpan: - def __init__(self, name: str) -> None: - self.name = name - self.output: Any = None - - -class _FakeTracing: - def __init__(self) -> None: - self.started: list[tuple[str, Any]] = [] - self.ended: list[tuple[str, Any]] = [] - - async def start_span( - self, *, trace_id: str, name: str, input: Any = None, parent_id: Any = None, **kw: Any - ) -> _FakeSpan: - self.started.append((name, parent_id)) - return _FakeSpan(name) - - async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None: - self.ended.append((span.name, span.output)) - - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -103,8 +79,8 @@ async def _gen(): async def _run_yield_turn( stream_events: list[tuple[str, Any]], trace_id: str | None = None -) -> tuple[list[Any], _FakeTracing | None]: - fake_tracing = _FakeTracing() if trace_id else None +) -> tuple[list[Any], FakeTracing | None]: + fake_tracing = FakeTracing() if trace_id else None tracer: SpanTracer | bool | None = None if trace_id and fake_tracing is not None: tracer = SpanTracer(trace_id=trace_id, parent_span_id=None, task_id="task1", tracing=fake_tracing) @@ -191,7 +167,7 @@ async def test_empty_stream_yields_nothing(self): assert out == [] async def test_tracer_produces_tool_spans_for_full_events(self): - """AGX1-377: SpanDeriver now handles Full tool events (request opens, response closes). + """SpanDeriver handles Full tool events (request opens, response closes). Full(ToolRequestContent) opens a tool span; Full(ToolResponseContent) closes it. This aligns LangGraph tracing with Start+Done harnesses (pydantic-ai, openai-agents). diff --git a/tests/lib/core/harness/test_harness_langgraph_temporal.py b/tests/lib/core/harness/test_harness_langgraph_temporal.py index 1a094a33c..219e92229 100644 --- a/tests/lib/core/harness/test_harness_langgraph_temporal.py +++ b/tests/lib/core/harness/test_harness_langgraph_temporal.py @@ -1,7 +1,7 @@ """Integration test: Temporal channel with a LangGraph agent. -The Temporal LangGraph agent pattern uses ``emit_langgraph_messages`` (from -``_langgraph_messages.py``) inside a Temporal activity. That module is not +The Temporal LangGraph agent pattern uses ``emit_langgraph_messages`` (now in +``_langgraph_sync.py``) inside a Temporal activity. That helper is not yet unified onto the harness surface (it has its own Redis-streaming code). This test file verifies the LangGraph Temporal agent's streaming behavior using @@ -43,8 +43,7 @@ from agentex.lib.core.harness.emitter import UnifiedEmitter from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent -from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn -from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn, stream_langgraph_events # --------------------------------------------------------------------------- # Remove conftest stubs so real langchain_core types are used From c98b52e93b9f16ffbfd37a04f60cce821ee153a0 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 12:49:22 -0400 Subject: [PATCH 2/2] docs(changelog): scope breaking-change to the LangGraph handler only Address Greptile review: the breaking-change entry listed create_pydantic_ai_tracing_handler as removed, but that handler is still exported here and is only removed in the following PR. Narrow this entry to the LangGraph handler actually removed in this change. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8edf5fb60..f758931b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### ⚠ BREAKING CHANGES -* **harness:** removed the deprecated bespoke tracing handlers `create_langgraph_tracing_handler` / `create_pydantic_ai_tracing_handler` (and their `AgentexLangGraphTracingHandler` / `AgentexPydanticAITracingHandler` classes) from the public `agentex.lib.adk` surface. Span tracing is now derived from the canonical `StreamTaskMessage*` stream by `UnifiedEmitter` — wrap your run in the harness `*Turn` and drive `UnifiedEmitter.yield_turn` / `auto_send_turn`. The `agentex init` templates were migrated accordingly. +* **harness:** removed the deprecated bespoke LangGraph tracing handler `create_langgraph_tracing_handler` (and its `AgentexLangGraphTracingHandler` class) from the public `agentex.lib.adk` surface. Span tracing is now derived from the canonical `StreamTaskMessage*` stream by `UnifiedEmitter` — wrap your run in the harness `*Turn` and drive `UnifiedEmitter.yield_turn` / `auto_send_turn`. The `agentex init` templates were migrated accordingly. * **harness:** each harness now exposes exactly `__sync.py` + `__turn.py` under `agentex.lib.adk._modules`. The OpenAI harness `OpenAITurn` and `convert_openai_to_agentex_events` moved to `agentex.lib.adk._modules._openai_turn` / `_openai_sync`; back-compat shims remain at `agentex.lib.adk.providers._modules.{openai_turn,sync_provider}` for one release. Public facade names (`stream_pydantic_ai_events`, `stream_langgraph_events`, `emit_langgraph_messages`, etc.) are unchanged. ### Features