diff --git a/CHANGELOG.md b/CHANGELOG.md index a9b0590c8..f758931b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +### ⚠ BREAKING CHANGES + +* **harness:** removed the deprecated bespoke LangGraph tracing handler `create_langgraph_tracing_handler` (and its `AgentexLangGraphTracingHandler` class) from the public `agentex.lib.adk` surface. Span tracing is now derived from the canonical `StreamTaskMessage*` stream by `UnifiedEmitter` — wrap your run in the harness `*Turn` and drive `UnifiedEmitter.yield_turn` / `auto_send_turn`. The `agentex init` templates were migrated accordingly. +* **harness:** each harness now exposes exactly `__sync.py` + `__turn.py` under `agentex.lib.adk._modules`. The OpenAI harness `OpenAITurn` and `convert_openai_to_agentex_events` moved to `agentex.lib.adk._modules._openai_turn` / `_openai_sync`; back-compat shims remain at `agentex.lib.adk.providers._modules.{openai_turn,sync_provider}` for one release. Public facade names (`stream_pydantic_ai_events`, `stream_langgraph_events`, `emit_langgraph_messages`, etc.) are unchanged. + ### Features * **tracing:** emit OTel metrics for async span queue depth, batch drain, and SGP export success/failure (HTTP status labels). Disable SDK-side recording with ``AGENTEX_TRACING_METRICS=0``. diff --git a/adk/docs/harness.md b/adk/docs/harness.md index 6a9d8947a..d81835a03 100644 --- a/adk/docs/harness.md +++ b/adk/docs/harness.md @@ -39,14 +39,17 @@ Every harness tap produces a sequence of these. Everything downstream (delivery, ## Per-harness taps: `convert__to_agentex_events` -A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The currently shipped taps are: +A tap is an async generator that translates the harness's native event stream into `StreamTaskMessage*` events. The shipped taps are: | Harness | Tap function | Exported from | |---|---|---| | pydantic-ai | `convert_pydantic_ai_to_agentex_events` | `agentex.lib.adk` | | LangGraph | `convert_langgraph_to_agentex_events` | `agentex.lib.adk` | +| claude-code | `convert_claude_code_to_agentex_events` | `agentex.lib.adk` | +| codex | `convert_codex_to_agentex_events` | `agentex.lib.adk` | +| OpenAI Agents | `convert_openai_to_agentex_events` | `agentex.lib.adk.providers._modules.sync_provider` | -Taps for claude-code and codex will be added in subsequent PRs (AGX1-420, AGX1-421) and exported from `agentex.lib.adk` in the same way. +Each harness also provides a `HarnessTurn` wrapper that pairs its tap's event stream with usage extraction: `PydanticAITurn`, `LangGraphTurn`, `ClaudeCodeTurn`, `CodexTurn`, and `OpenAITurn`. --- @@ -157,11 +160,13 @@ Spans are derived from the canonical stream by `SpanDeriver` (pure, no `adk` dep ## Usage examples by channel -### Sync ACP (pydantic-ai tap) +### Sync ACP (`yield_turn`) + +Build the harness's `HarnessTurn` wrapper and iterate `emitter.yield_turn(turn)` — the emitter forwards each event to the caller and traces spans as a side effect: ```python import agentex.lib.adk as adk -from agentex.lib.adk import UnifiedEmitter, convert_pydantic_ai_to_agentex_events +from agentex.lib.adk import UnifiedEmitter, ClaudeCodeTurn @acp.on_message_send async def handle(params): @@ -172,13 +177,12 @@ async def handle(params): trace_id=task_id, parent_span_id=turn_span.id if turn_span else None, ) - tap = convert_pydantic_ai_to_agentex_events(pydantic_stream) - # wrap tap in a HarnessTurn then yield_turn, or yield directly: - async for event in tap: + turn = ClaudeCodeTurn(claude_code_stream) # any HarnessTurn + async for event in emitter.yield_turn(turn): yield event ``` -For the pre-unified sync path the tap is still yielded directly; `UnifiedEmitter.yield_turn` is the forward-looking integration point when a `HarnessTurn` wrapper is available. +Every harness follows the same shape — swap `ClaudeCodeTurn` for `PydanticAITurn`, `LangGraphTurn`, `CodexTurn`, or `OpenAITurn` and feed it that harness's native stream. ### Async Temporal (auto-send) diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index e618a20d3..86fa90253 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -6,11 +6,11 @@ from agentex.lib.adk._modules.agents import AgentsModule from agentex.lib.adk._modules.agent_task_tracker import AgentTaskTrackerModule from agentex.lib.adk._modules.checkpointer import create_checkpointer -from agentex.lib.adk._modules._langgraph_tracing import create_langgraph_tracing_handler -from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events -from agentex.lib.adk._modules._langgraph_messages import emit_langgraph_messages -from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events -from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn, stream_langgraph_events +from agentex.lib.adk._modules._langgraph_sync import ( + emit_langgraph_messages, + convert_langgraph_to_agentex_events, +) from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events from agentex.lib.adk._modules._pydantic_ai_tracing import create_pydantic_ai_tracing_handler @@ -68,7 +68,6 @@ "agent_task_tracker", # Checkpointing / LangGraph "create_checkpointer", - "create_langgraph_tracing_handler", "stream_langgraph_events", "emit_langgraph_messages", "convert_langgraph_to_agentex_events", diff --git a/src/agentex/lib/adk/_modules/_langgraph_async.py b/src/agentex/lib/adk/_modules/_langgraph_async.py deleted file mode 100644 index 02ef059eb..000000000 --- a/src/agentex/lib/adk/_modules/_langgraph_async.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Async LangGraph streaming helper for Agentex. - -Converts LangGraph graph.astream() events into Agentex streaming updates -and pushes them to Redis via adk.streaming contexts. For use with async -ACP agents that stream via Redis rather than HTTP yields. - -Unified surface ---------------- -This module is now implemented on top of ``LangGraphTurn`` and -``UnifiedEmitter.auto_send_turn``, the same surface used by every other -harness adapter (pydantic-ai, openai-agents, etc.). The public signature -and return type are preserved identically. - -AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` events -(from "updates" events), NOT Start+Delta+Done like pydantic-ai. ``auto_send`` -handles Full events correctly; no coalescing wrapper is needed. -""" - -from agentex.lib.utils.temporal import workflow_now_if_in_workflow - - -async def stream_langgraph_events(stream, task_id: str) -> str: - """Stream LangGraph events to Agentex via Redis. - - Processes the stream from graph.astream() called with - stream_mode=["messages", "updates"] and pushes text, reasoning, - tool request, and tool response messages through Redis streaming - contexts. - - Supports both regular models (chunk.content is a str) and reasoning - models like gpt-5/o1/o3 (chunk.content is a list of typed content blocks - in the Responses API responses/v1 format). - - Reimplemented on ``UnifiedEmitter.auto_send_turn(LangGraphTurn(...))`` for - cross-harness consistency. Behavior is identical to the previous bespoke - implementation (verified by characterization tests in test_langgraph_async.py). - - AGX1-377 note: LangGraph emits tool requests as ``Full`` events (from "updates"), - NOT Start+Delta+Done like pydantic-ai. ``auto_send`` handles Full events - correctly; no coalescing wrapper is needed. - - AGX1-378 note: ``created_at`` is set from ``workflow.now()`` when called inside a - Temporal workflow, matching the pattern used by the openai/litellm providers. - Outside a workflow (plain async activities, sync agents) it is ``None`` and the - server's wall clock is used. - - Args: - stream: Async iterator from graph.astream(..., stream_mode=["messages", "updates"]) - task_id: The Agentex task ID to stream messages to. - - Returns: - The accumulated final text output from the agent. - """ - from agentex.lib.core.harness.emitter import UnifiedEmitter - from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn - - # AGX1-377 note: LangGraph emits tool requests as Full events (from "updates"), - # NOT Start+Delta+Done like pydantic-ai. auto_send handles Full events correctly; - # no coalescing wrapper is needed. - # AGX1-378: stamp messages with workflow.now() inside Temporal for deterministic - # created_at ordering; falls back to None (server wall clock) outside a workflow. - turn = LangGraphTurn(stream, model=None) - emitter = UnifiedEmitter(task_id=task_id, trace_id=None, parent_span_id=None) - result = await emitter.auto_send_turn(turn, created_at=workflow_now_if_in_workflow()) - return result.final_text diff --git a/src/agentex/lib/adk/_modules/_langgraph_messages.py b/src/agentex/lib/adk/_modules/_langgraph_messages.py deleted file mode 100644 index c8856755b..000000000 --- a/src/agentex/lib/adk/_modules/_langgraph_messages.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Emit finished LangGraph messages as Agentex task messages. - -This is the non-streaming counterpart to ``stream_langgraph_events``. Use it -when you run a LangGraph graph with ``ainvoke`` (for example a Temporal-backed -agent using the LangGraph plugin, where streaming deltas aren't available) and -want to surface the resulting messages to the Agentex UI after the fact. - -It maps LangGraph/LangChain message objects to Agentex content types: - -- ``AIMessage`` tool calls → ``ToolRequestContent`` (one per call) -- ``AIMessage`` text content → ``TextContent`` -- ``ToolMessage`` → ``ToolResponseContent`` - -Pass only the messages produced this turn (e.g. ``messages[already_emitted:]``) -so each message is surfaced exactly once across a multi-turn conversation. -""" - -from __future__ import annotations - -from typing import Any - - -async def emit_langgraph_messages(messages: list[Any], task_id: str) -> str: - """Create Agentex messages for a list of LangGraph messages. - - Args: - messages: LangGraph/LangChain message objects to surface — typically - the new messages a turn produced. - task_id: The Agentex task to create messages on. - - Returns: - The last assistant text emitted (useful as a span/turn output), or "". - """ - # Lazy imports so langchain isn't required at module load time. - from langchain_core.messages import AIMessage, ToolMessage - - from agentex.lib import adk - from agentex.types.text_content import TextContent - from agentex.types.tool_request_content import ToolRequestContent - from agentex.types.tool_response_content import ToolResponseContent - - final_text = "" - for message in messages: - if isinstance(message, AIMessage): - for tool_call in message.tool_calls or []: - await adk.messages.create( - task_id=task_id, - content=ToolRequestContent( - author="agent", - tool_call_id=tool_call["id"], - name=tool_call["name"], - arguments=tool_call["args"], - ), - ) - # ``content`` may be a plain string (OpenAI) or a list of content - # blocks (Anthropic/Claude via LangChain, e.g. - # ``[{"type": "text", "text": "..."}]``). Extract and join the text - # so the response is visible regardless of the underlying model. - if isinstance(message.content, str): - text = message.content - else: - text = "".join( - block.get("text", "") if isinstance(block, dict) else str(block) - for block in message.content - if not isinstance(block, dict) or block.get("type") == "text" - ) - if text: - final_text = text - await adk.messages.create( - task_id=task_id, - content=TextContent(author="agent", content=text, format="markdown"), - ) - elif isinstance(message, ToolMessage): - await adk.messages.create( - task_id=task_id, - content=ToolResponseContent( - author="agent", - tool_call_id=message.tool_call_id, - name=message.name or "unknown", - content=message.content - if isinstance(message.content, str) - else str(message.content), - ), - ) - return final_text diff --git a/src/agentex/lib/adk/_modules/_langgraph_sync.py b/src/agentex/lib/adk/_modules/_langgraph_sync.py index 48231a87d..9d7b73847 100644 --- a/src/agentex/lib/adk/_modules/_langgraph_sync.py +++ b/src/agentex/lib/adk/_modules/_langgraph_sync.py @@ -48,8 +48,8 @@ async def convert_langgraph_to_agentex_events( Supports both regular models (chunk.content is a str) and reasoning models like gpt-5/o1/o3 (chunk.content is a list of typed content blocks). - AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` (from - "updates" events), NOT Start+Delta+Done like pydantic-ai. No coalesce_tool_requests + LangGraph emits tool requests as ``StreamTaskMessageFull`` (from "updates" + events), NOT Start+Delta+Done like pydantic-ai. No coalesce_tool_requests option is needed for LangGraph. Args: @@ -271,3 +271,82 @@ async def convert_langgraph_to_agentex_events( yield StreamTaskMessageDone(type="done", index=message_index) if reasoning_streaming: yield StreamTaskMessageDone(type="done", index=message_index) + + +async def emit_langgraph_messages(messages: list[Any], task_id: str) -> str: + """Create Agentex messages for a list of LangGraph messages. + + This is the non-streaming counterpart to ``stream_langgraph_events``. Use it + when you run a LangGraph graph with ``ainvoke`` (for example a Temporal-backed + agent using the LangGraph plugin, where streaming deltas aren't available) and + want to surface the resulting messages to the Agentex UI after the fact. + + It maps LangGraph/LangChain message objects to Agentex content types: + + - ``AIMessage`` tool calls -> ``ToolRequestContent`` (one per call) + - ``AIMessage`` text content -> ``TextContent`` + - ``ToolMessage`` -> ``ToolResponseContent`` + + Pass only the messages produced this turn (e.g. ``messages[already_emitted:]``) + so each message is surfaced exactly once across a multi-turn conversation. + + Args: + messages: LangGraph/LangChain message objects to surface — typically + the new messages a turn produced. + task_id: The Agentex task to create messages on. + + Returns: + The last assistant text emitted (useful as a span/turn output), or "". + """ + # Lazy imports so langchain isn't required at module load time. + from langchain_core.messages import AIMessage, ToolMessage + + from agentex.lib import adk + from agentex.types.text_content import TextContent + from agentex.types.tool_request_content import ToolRequestContent + from agentex.types.tool_response_content import ToolResponseContent + + final_text = "" + for message in messages: + if isinstance(message, AIMessage): + for tool_call in message.tool_calls or []: + await adk.messages.create( + task_id=task_id, + content=ToolRequestContent( + author="agent", + tool_call_id=tool_call["id"], + name=tool_call["name"], + arguments=tool_call["args"], + ), + ) + # ``content`` may be a plain string (OpenAI) or a list of content + # blocks (Anthropic/Claude via LangChain, e.g. + # ``[{"type": "text", "text": "..."}]``). Extract and join the text + # so the response is visible regardless of the underlying model. + if isinstance(message.content, str): + text = message.content + else: + text = "".join( + block.get("text", "") if isinstance(block, dict) else str(block) + for block in message.content + if not isinstance(block, dict) or block.get("type") == "text" + ) + if text: + final_text = text + await adk.messages.create( + task_id=task_id, + content=TextContent(author="agent", content=text, format="markdown"), + ) + elif isinstance(message, ToolMessage): + await adk.messages.create( + task_id=task_id, + content=ToolResponseContent( + author="agent", + tool_call_id=message.tool_call_id, + name=message.name or "unknown", + content=message.content + if isinstance(message.content, str) + else str(message.content), + ), + ) + return final_text diff --git a/src/agentex/lib/adk/_modules/_langgraph_tracing.py b/src/agentex/lib/adk/_modules/_langgraph_tracing.py deleted file mode 100644 index 2162201e1..000000000 --- a/src/agentex/lib/adk/_modules/_langgraph_tracing.py +++ /dev/null @@ -1,273 +0,0 @@ -"""LangChain callback handler that creates Agentex spans for LLM calls and tool executions. - -.. deprecated:: - ``AgentexLangGraphTracingHandler`` and ``create_langgraph_tracing_handler`` are - superseded by the unified harness surface (``LangGraphTurn`` + - ``UnifiedEmitter``), which derives spans automatically from the canonical - event stream without requiring a LangChain callback handler. - - They remain importable and functional for backward compatibility, but new - agents should use the unified path instead. -""" -# ruff: noqa: ARG002 -# Callback methods must accept all arguments defined by LangChain's AsyncCallbackHandler interface. - -from __future__ import annotations - -from uuid import UUID -from typing import Any, override - -from langchain_core.outputs import LLMResult -from langchain_core.messages import BaseMessage -from langchain_core.callbacks import AsyncCallbackHandler - -from agentex.types.span import Span -from agentex.lib.utils.logging import make_logger -from agentex.lib.adk._modules.tracing import TracingModule - -logger = make_logger(__name__) - - -class AgentexLangGraphTracingHandler(AsyncCallbackHandler): - """Async LangChain callback handler that records Agentex tracing spans. - - Creates child spans under a parent span for each LLM call and tool execution. - Designed to be passed via ``config={"callbacks": [handler]}`` to LangGraph's - ``graph.astream()`` or ``graph.ainvoke()``. - - Span hierarchy produced:: - - (e.g. "message" turn-level span) - ├── llm: (LLM call) - ├── tool: (tool execution) - └── llm: (LLM call) - - .. deprecated:: - Use ``LangGraphTurn`` with ``UnifiedEmitter`` instead. The unified - harness derives equivalent spans from the canonical event stream, - removing the need for a LangChain callback handler entirely. - """ - - def __init__( - self, - trace_id: str, - parent_span_id: str | None = None, - tracing: TracingModule | None = None, - ) -> None: - super().__init__() - self._trace_id = trace_id - self._parent_span_id = parent_span_id - # Lazily initialise TracingModule so the httpx client is created - # inside the *running* event-loop (not at import/construction time). - self._tracing_eager = tracing - self._tracing_lazy: TracingModule | None = None - # Map run_id → Span for in-flight spans - self._spans: dict[UUID, Span] = {} - - @property - def _tracing(self) -> TracingModule: - if self._tracing_eager is not None: - return self._tracing_eager - if self._tracing_lazy is None: - self._tracing_lazy = TracingModule() - return self._tracing_lazy - - # ------------------------------------------------------------------ - # LLM lifecycle - # ------------------------------------------------------------------ - - @override - async def on_chat_model_start( - self, - serialized: dict[str, Any], - messages: list[list[BaseMessage]], - *, - run_id: UUID, - parent_run_id: UUID | None = None, - tags: list[str] | None = None, - metadata: dict[str, Any] | None = None, - **kwargs: Any, - ) -> None: - model_name = (metadata or {}).get("ls_model_name", "") or _extract_model_name(serialized) - span = await self._tracing.start_span( - trace_id=self._trace_id, - name=f"llm:{model_name}" if model_name else "llm", - input=_serialize_messages(messages), - parent_id=self._parent_span_id, - data={"__span_type__": "COMPLETION"}, - ) - if span: - self._spans[run_id] = span - - @override - async def on_llm_end( - self, - response: LLMResult, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - **kwargs: Any, - ) -> None: - span = self._spans.pop(run_id, None) - if span is None: - return - span.output = _serialize_llm_result(response) - await self._tracing.end_span(trace_id=self._trace_id, span=span) - - @override - async def on_llm_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - **kwargs: Any, - ) -> None: - span = self._spans.pop(run_id, None) - if span is None: - return - span.output = {"error": str(error)} - await self._tracing.end_span(trace_id=self._trace_id, span=span) - - # ------------------------------------------------------------------ - # Tool lifecycle - # ------------------------------------------------------------------ - - @override - async def on_tool_start( - self, - serialized: dict[str, Any], - input_str: str, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - tags: list[str] | None = None, - metadata: dict[str, Any] | None = None, - inputs: dict[str, Any] | None = None, - **kwargs: Any, - ) -> None: - tool_name = serialized.get("name", "") or serialized.get("id", [""])[-1] - span = await self._tracing.start_span( - trace_id=self._trace_id, - name=f"tool:{tool_name}" if tool_name else "tool", - input={"input": input_str}, - parent_id=self._parent_span_id, - data={"__span_type__": "CUSTOM"}, - ) - if span: - self._spans[run_id] = span - - @override - async def on_tool_end( - self, - output: str, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - **kwargs: Any, - ) -> None: - span = self._spans.pop(run_id, None) - if span is None: - return - span.output = {"output": output} - await self._tracing.end_span(trace_id=self._trace_id, span=span) - - @override - async def on_tool_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: UUID | None = None, - **kwargs: Any, - ) -> None: - span = self._spans.pop(run_id, None) - if span is None: - return - span.output = {"error": str(error)} - await self._tracing.end_span(trace_id=self._trace_id, span=span) - - -# ------------------------------------------------------------------ -# Helpers -# ------------------------------------------------------------------ - - -def _extract_model_name(serialized: dict[str, Any]) -> str: - """Best-effort model name extraction from the serialized callback dict.""" - kwargs = serialized.get("kwargs", {}) - return kwargs.get("model_name", "") or kwargs.get("model", "") - - -def _serialize_messages(messages: list[list[BaseMessage]]) -> dict[str, Any]: - """Serialize LangChain messages into a JSON-safe dict for the span input.""" - result: list[dict[str, Any]] = [] - for batch in messages: - for msg in batch: - entry: dict[str, Any] = {"type": msg.type, "content": msg.content} - tool_calls = getattr(msg, "tool_calls", None) - if tool_calls: - entry["tool_calls"] = tool_calls - result.append(entry) - return {"messages": result} - - -def _serialize_llm_result(response: LLMResult) -> dict[str, Any]: - """Serialize an LLMResult into a JSON-safe dict for the span output.""" - output: dict[str, Any] = {} - if response.generations: - last_gen = response.generations[-1] - if last_gen: - gen = last_gen[-1] - msg = getattr(gen, "message", None) - - # For reasoning models, content is a list of typed blocks. - # Extract text from the blocks instead of relying on gen.text. - if msg and isinstance(msg.content, list): - text_parts: list[str] = [] - for block in msg.content: - if isinstance(block, dict): - if block.get("type") == "text": - text_parts.append(block.get("text", "")) - output["content"] = "".join(text_parts) if text_parts else gen.text - else: - output["content"] = gen.text - - if msg and hasattr(msg, "tool_calls") and msg.tool_calls: - output["tool_calls"] = [{"name": tc["name"], "args": tc["args"]} for tc in msg.tool_calls] - return output - - -def create_langgraph_tracing_handler( - trace_id: str, - parent_span_id: str | None = None, -) -> AgentexLangGraphTracingHandler: - """Create a LangChain callback handler that records Agentex tracing spans. - - Pass the returned handler to LangGraph via ``config={"callbacks": [handler]}``. - - Args: - trace_id: The trace ID (typically the task/thread ID). - parent_span_id: Optional parent span ID to nest LLM/tool spans under. - - Returns: - An ``AgentexLangGraphTracingHandler`` instance ready to use as a LangChain callback. - - .. deprecated:: - Use ``LangGraphTurn`` with ``UnifiedEmitter`` instead. The unified harness - derives equivalent spans from the canonical event stream automatically, with - no LangChain callback required:: - - from agentex.lib.core.harness.emitter import UnifiedEmitter - from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn - - turn = LangGraphTurn(stream) - emitter = UnifiedEmitter(task_id=task_id, trace_id=trace_id, parent_span_id=span_id) - result = await emitter.auto_send_turn(turn) - - This function remains available for backward compatibility. - """ - return AgentexLangGraphTracingHandler( - trace_id=trace_id, - parent_span_id=parent_span_id, - ) diff --git a/src/agentex/lib/adk/_modules/_langgraph_turn.py b/src/agentex/lib/adk/_modules/_langgraph_turn.py index da8ff0e7c..a6e290e1b 100644 --- a/src/agentex/lib/adk/_modules/_langgraph_turn.py +++ b/src/agentex/lib/adk/_modules/_langgraph_turn.py @@ -4,9 +4,9 @@ ``langgraph_usage_to_turn_usage`` helper that maps LangGraph's ``AIMessage.usage_metadata`` onto the framework-agnostic ``TurnUsage`` model. -AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` events -(from "updates" events), NOT Start+Delta+Done like pydantic-ai. ``auto_send`` -handles Full events correctly; no coalescing wrapper is needed. +LangGraph emits tool requests as ``StreamTaskMessageFull`` events (from +"updates" events), NOT Start+Delta+Done like pydantic-ai. ``auto_send`` handles +Full events correctly; no coalescing wrapper is needed. """ from __future__ import annotations @@ -14,6 +14,7 @@ from typing import Any, AsyncIterator from collections.abc import AsyncGenerator +from agentex.lib.utils.temporal import workflow_now_if_in_workflow from agentex.lib.core.harness.types import TurnUsage, StreamTaskMessage from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events @@ -111,9 +112,9 @@ class LangGraphTurn: # Async / temporal result = await emitter.auto_send_turn(turn) - AGX1-377 note: LangGraph tool requests are ``StreamTaskMessageFull`` (from - "updates"), NOT Start+Delta+Done like pydantic-ai. No ``coalesce_tool_requests`` - option is needed. + LangGraph tool requests are ``StreamTaskMessageFull`` (from "updates"), NOT + Start+Delta+Done like pydantic-ai. No ``coalesce_tool_requests`` option is + needed. Usage data is captured lazily via the ``on_final_ai_message`` callback and is only valid after ``events`` has been fully consumed. Multi-step turns @@ -150,3 +151,50 @@ def usage(self) -> TurnUsage: did not report usage. """ return self._usage + + +async def stream_langgraph_events(stream, task_id: str) -> str: + """Stream LangGraph events to Agentex via Redis. + + Converts LangGraph ``graph.astream()`` events into Agentex streaming + updates and pushes them to Redis via ``adk.streaming`` contexts. For use + with async ACP agents that stream via Redis rather than HTTP yields. + + Processes the stream from graph.astream() called with + stream_mode=["messages", "updates"] and pushes text, reasoning, + tool request, and tool response messages through Redis streaming + contexts. + + Supports both regular models (chunk.content is a str) and reasoning + models like gpt-5/o1/o3 (chunk.content is a list of typed content blocks + in the Responses API responses/v1 format). + + Implemented on ``UnifiedEmitter.auto_send_turn(LangGraphTurn(...))`` for + cross-harness consistency, the same surface used by every other harness + adapter (pydantic-ai, openai-agents, etc.). The public signature and + return type are preserved identically. + + LangGraph emits tool requests as ``Full`` events (from "updates"), NOT + Start+Delta+Done like pydantic-ai. ``auto_send`` handles Full events + correctly; no coalescing wrapper is needed. + + ``created_at`` is set from ``workflow.now()`` when called inside a + Temporal workflow, matching the pattern used by the openai/litellm providers. + Outside a workflow (plain async activities, sync agents) it is ``None`` and the + server's wall clock is used. + + Args: + stream: Async iterator from graph.astream(..., stream_mode=["messages", "updates"]) + task_id: The Agentex task ID to stream messages to. + + Returns: + The accumulated final text output from the agent. + """ + from agentex.lib.core.harness.emitter import UnifiedEmitter + + # Stamp messages with workflow.now() inside Temporal for deterministic + # created_at ordering; falls back to None (server wall clock) outside a workflow. + turn = LangGraphTurn(stream, model=None) + emitter = UnifiedEmitter(task_id=task_id, trace_id=None, parent_span_id=None) + result = await emitter.auto_send_turn(turn, created_at=workflow_now_if_in_workflow()) + return result.final_text diff --git a/src/agentex/lib/core/harness/auto_send.py b/src/agentex/lib/core/harness/auto_send.py index 2ecd6b583..b645a4aae 100644 --- a/src/agentex/lib/core/harness/auto_send.py +++ b/src/agentex/lib/core/harness/auto_send.py @@ -52,11 +52,11 @@ async def auto_send( final_text_parts so that multi-step turns return the LAST text segment. Full(TextContent) also overwrites final_text_parts (same semantics). - AGX1-378: created_at is forwarded to every streaming_task_message_context - call so callers can back-date message timestamps. + created_at is forwarded to every streaming_task_message_context call so + callers can back-date message timestamps. Mirrors the open/close/stream_update pattern from - src/agentex/lib/adk/_modules/_langgraph_async.py: + src/agentex/lib/adk/_modules/_langgraph_turn.py: - context opened via streaming_task_message_context(...).__aenter__() - context closed via ctx.close() (not __aexit__) - deltas pushed as StreamTaskMessageDelta with parent_task_message set @@ -110,8 +110,8 @@ async def _close_all() -> None: ctx = ctx_map.get(event.index) if ctx is not None and event.delta is not None: # Reconstruct the delta with parent_task_message set from - # the context's task_message (mirrors _langgraph_async.py - # lines 72-78 and 117-127). + # the context's task_message (mirrors the legacy + # _langgraph_async streaming helper, now in _langgraph_turn.py). delta_with_parent = StreamTaskMessageDelta( parent_task_message=ctx.task_message, delta=event.delta, diff --git a/src/agentex/lib/core/harness/tracer.py b/src/agentex/lib/core/harness/tracer.py index 4ca4d628b..0c6167b76 100644 --- a/src/agentex/lib/core/harness/tracer.py +++ b/src/agentex/lib/core/harness/tracer.py @@ -24,7 +24,7 @@ class SpanTracer: The real TracingModule.end_span does NOT accept an output kwarg — output is recorded by mutating span.output before calling end_span, matching the pattern - used throughout the codebase (see _langgraph_tracing.py on_tool_end etc.). + used throughout the codebase. Span-lifecycle contract: the `_open` dict (span key -> span object) is scoped to a single turn. Pairing is by `key`: diff --git a/tests/lib/adk/test_langgraph_async.py b/tests/lib/adk/test_langgraph_async.py index 682bd43bc..ebe215a15 100644 --- a/tests/lib/adk/test_langgraph_async.py +++ b/tests/lib/adk/test_langgraph_async.py @@ -26,7 +26,7 @@ from agentex.types.text_content import TextContent from agentex.types.task_message_delta import TextDelta from agentex.types.task_message_update import StreamTaskMessageDelta -from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events +from agentex.lib.adk._modules._langgraph_turn import stream_langgraph_events TASK_ID = "task-test" diff --git a/tests/lib/adk/test_langgraph_sync.py b/tests/lib/adk/test_langgraph_sync.py index 248d18f68..9e8c6e4f0 100644 --- a/tests/lib/adk/test_langgraph_sync.py +++ b/tests/lib/adk/test_langgraph_sync.py @@ -1,10 +1,12 @@ -"""Tests for the sync LangGraph -> Agentex stream event converter. +"""Tests for the sync LangGraph -> Agentex path. Covers: -- Basic text, tool call, and tool response emission -- on_final_ai_message callback for usage capture -- create_langgraph_tracing_handler symbol is importable and functional - (runtime DeprecationWarning removed; deprecation is docstring-only) +- The bare converter ``convert_langgraph_to_agentex_events``: + * Basic text, tool call, and tool response emission + * on_final_ai_message callback for usage capture +- The unified sync (HTTP ACP) path ``UnifiedEmitter.yield_turn(LangGraphTurn(...))``: + * Passthrough: yield_turn events equal LangGraphTurn(stream).events + * Span derivation from Full tool events with a fake tracer NOTE: langchain_core imports must be deferred to test-function scope because conftest.py stubs out ``langchain_core.messages`` with MagicMock for ADK @@ -15,15 +17,20 @@ import sys from typing import Any, AsyncIterator +from datetime import datetime, timezone +from dataclasses import field, dataclass import pytest +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter from agentex.types.task_message_update import ( StreamTaskMessageFull, ) from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn # --------------------------------------------------------------------------- # Helpers @@ -227,21 +234,160 @@ def _cb(msg): assert yield_order.index("event") < yield_order.index("callback") -class TestLangGraphTracingHandlerBackwardCompat: - def test_create_langgraph_tracing_handler_no_runtime_warning(self): - """Deprecated symbol remains importable and emits no runtime DeprecationWarning. +# --------------------------------------------------------------------------- +# Unified sync path: LangGraphTurn + UnifiedEmitter.yield_turn +# +# Verifies the sync (HTTP ACP) delivery surface: +# 1. Passthrough: events from emitter.yield_turn(LangGraphTurn(stream)) equal +# LangGraphTurn(stream).events collected directly. +# 2. Span derivation: with trace_id + fake tracer, tool spans are derived from +# the event stream. +# --------------------------------------------------------------------------- + + +@dataclass +class _FakeTracingBackend: + spans_started: list[dict[str, Any]] = field(default_factory=list) + spans_ended: list[str] = field(default_factory=list) + + async def start_span(self, **kw) -> Any: + from agentex.types.span import Span + + sp = Span( + id=f"span-{len(self.spans_started) + 1}", + trace_id=kw.get("trace_id", "trace1"), + name=kw.get("name", ""), + start_time=datetime.now(tz=timezone.utc), + ) + self.spans_started.append(kw) + return sp + + async def end_span(self, *, trace_id: str, span: Any) -> None: + self.spans_ended.append(span.id if span else "") + + +class TestUnifiedSyncPathPassthrough: + async def test_yield_turn_events_equal_direct_events(self): + """Events from emitter.yield_turn(LangGraphTurn(stream)) must equal + LangGraphTurn(stream).events collected directly — the emitter must not + add, drop, or reorder events in yield mode.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="Hello!") + ai_msg = AIMessage(content="Hello!") - The runtime warnings.warn was removed (docstring-only deprecation) to - align with PR 4/6 and avoid breaking callers under warnings-as-errors. - Using ``warnings.simplefilter("error", DeprecationWarning)`` verifies - that calling the function is safe under -W error conditions. + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + + direct = [e async for e in LangGraphTurn(_make_stream(events_raw)).events] + + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + via_emitter = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert len(direct) == len(via_emitter), "yield_turn must not add or drop events relative to direct iteration" + for a, b in zip(direct, via_emitter, strict=True): + assert type(a) == type(b), f"Event type mismatch: {type(a).__name__} vs {type(b).__name__}" + + async def test_yield_turn_passes_all_event_types(self): + """Start, Delta, Done, Full — each type is preserved.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="hi") + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="hi", tool_calls=[tc]) + + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + types = {type(e).__name__ for e in out} + # text chunk emits Start + Delta + assert "StreamTaskMessageStart" in types + assert "StreamTaskMessageDelta" in types + # tool call emits Full + assert "StreamTaskMessageFull" in types + + async def test_empty_stream_yields_no_events(self): + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream([])))] + assert out == [] + + +class TestUnifiedSyncPathSpanDerivation: + @pytest.fixture + def fake_tracer(self): + backend = _FakeTracingBackend() + tracer = SpanTracer( + trace_id="trace1", + parent_span_id=None, + task_id="t", + tracing=backend, # type: ignore[arg-type] + ) + return tracer, backend + + async def test_tool_span_derived_from_full_events(self, fake_tracer): + """SpanDeriver handles Full tool events for LangGraph. + + Full(ToolRequestContent) opens a tool span keyed by tool_call_id; + Full(ToolResponseContent) closes it, aligning LangGraph's Full-event + path with the Start+Done harnesses (pydantic-ai, openai-agents). """ - import warnings + from langchain_core.messages import AIMessage, ToolMessage - from agentex.lib.adk._modules._langgraph_tracing import create_langgraph_tracing_handler + tracer, backend = fake_tracer + tc = {"id": "c1", "name": "get_weather", "args": {"city": "Paris"}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="Sunny", tool_call_id="c1", name="get_weather") + + events_raw = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("error", DeprecationWarning) - create_langgraph_tracing_handler(trace_id="t1", parent_span_id="p1") + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert len(backend.spans_started) == 1, "Full(ToolRequestContent) opens one tool span" + started = backend.spans_started[0] + assert started["name"] == "get_weather" + assert started["input"] == {"city": "Paris"} + + async def test_no_spans_when_no_tool_calls(self, fake_tracer): + """yield_turn with tracer but no tool calls emits no spans.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + tracer, backend = fake_tracer + chunk = AIMessageChunk(content="Hello!") + ai_msg = AIMessage(content="Hello!") + + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert backend.spans_started == [], "No tool spans when there are no tool calls" + + async def test_tracer_none_means_no_spans(self): + """With tracer=False, no spans should be emitted.""" + from langchain_core.messages import AIMessage, ToolMessage + + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="ok", tool_call_id="c1", name="t") + + events_raw = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] - assert w == [], "create_langgraph_tracing_handler must NOT emit a runtime DeprecationWarning" + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=False) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + # No assertion on spans since tracer=False means emitter.tracer is None + assert emitter.tracer is None diff --git a/tests/lib/adk/test_langgraph_sync_unified.py b/tests/lib/adk/test_langgraph_sync_unified.py deleted file mode 100644 index cfd522828..000000000 --- a/tests/lib/adk/test_langgraph_sync_unified.py +++ /dev/null @@ -1,214 +0,0 @@ -"""Unified sync path tests for LangGraphTurn + UnifiedEmitter. - -Verifies: -1. Passthrough: events from emitter.yield_turn(LangGraphTurn(stream)) equal - LangGraphTurn(stream).events collected directly. -2. Span derivation: with trace_id + fake tracer, tool spans are derived from - the event stream. - -NOTE: langchain_core imports are deferred to test scope because conftest.py -stubs ``langchain_core.messages`` with MagicMock. -""" - -from __future__ import annotations - -import sys -from typing import Any -from datetime import datetime, timezone -from dataclasses import field, dataclass - -import pytest - -from agentex.lib.core.harness.tracer import SpanTracer -from agentex.lib.core.harness.emitter import UnifiedEmitter -from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn - -# --------------------------------------------------------------------------- -# Remove conftest stubs so real langchain_core types are used -# --------------------------------------------------------------------------- - - -@pytest.fixture(autouse=True) -def _real_langchain_core(): - stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] - saved = {k: sys.modules.pop(k) for k in stub_keys} - import importlib - - importlib.import_module("langchain_core.messages") - yield - sys.modules.update(saved) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _make_stream(events: list[tuple[str, Any]]): - async def _gen(): - for e in events: - yield e - - return _gen() - - -# --------------------------------------------------------------------------- -# Fake SpanTracer -# --------------------------------------------------------------------------- - - -@dataclass -class _FakeTracingBackend: - spans_started: list[dict[str, Any]] = field(default_factory=list) - spans_ended: list[str] = field(default_factory=list) - - async def start_span(self, **kw) -> Any: - from agentex.types.span import Span - - sp = Span( - id=f"span-{len(self.spans_started) + 1}", - trace_id=kw.get("trace_id", "trace1"), - name=kw.get("name", ""), - start_time=datetime.now(tz=timezone.utc), - ) - self.spans_started.append(kw) - return sp - - async def end_span(self, *, trace_id: str, span: Any) -> None: - self.spans_ended.append(span.id if span else "") - - -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- - - -class TestPassthrough: - async def test_yield_turn_events_equal_direct_events(self): - """Events from emitter.yield_turn(LangGraphTurn(stream)) must equal - LangGraphTurn(stream).events collected directly — the emitter must not - add, drop, or reorder events in yield mode.""" - from langchain_core.messages import AIMessage, AIMessageChunk - - chunk = AIMessageChunk(content="Hello!") - ai_msg = AIMessage(content="Hello!") - - # Build two identical streams - events_raw = [ - ("messages", (chunk, {})), - ("updates", {"agent": {"messages": [ai_msg]}}), - ] - - # Direct collection - direct = [e async for e in LangGraphTurn(_make_stream(events_raw)).events] - - # Via emitter.yield_turn - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) - via_emitter = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - - assert len(direct) == len(via_emitter), "yield_turn must not add or drop events relative to direct iteration" - for a, b in zip(direct, via_emitter, strict=True): - assert type(a) == type(b), f"Event type mismatch: {type(a).__name__} vs {type(b).__name__}" - - async def test_yield_turn_passes_all_event_types(self): - """Start, Delta, Done, Full — each type is preserved.""" - from langchain_core.messages import AIMessage, AIMessageChunk - - chunk = AIMessageChunk(content="hi") - tc = {"id": "c1", "name": "t", "args": {}} - ai_msg = AIMessage(content="hi", tool_calls=[tc]) - - events_raw = [ - ("messages", (chunk, {})), - ("updates", {"agent": {"messages": [ai_msg]}}), - ] - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) - out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - types = {type(e).__name__ for e in out} - # text chunk emits Start + Delta - assert "StreamTaskMessageStart" in types - assert "StreamTaskMessageDelta" in types - # tool call emits Full - assert "StreamTaskMessageFull" in types - - async def test_empty_stream_yields_no_events(self): - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) - out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream([])))] - assert out == [] - - -class TestSpanDerivation: - @pytest.fixture - def fake_tracer(self): - backend = _FakeTracingBackend() - tracer = SpanTracer( - trace_id="trace1", - parent_span_id=None, - task_id="t", - tracing=backend, # type: ignore[arg-type] - ) - return tracer, backend - - async def test_tool_span_derived_from_full_events(self, fake_tracer): - """AGX1-377: SpanDeriver now handles Full tool events for LangGraph. - - Full(ToolRequestContent) opens a tool span keyed by tool_call_id; - Full(ToolResponseContent) closes it. This bridges the previous gap where - LangGraph's Full-event path produced no spans, aligning it with - Start+Done harnesses (pydantic-ai, openai-agents). - """ - from langchain_core.messages import AIMessage, ToolMessage - - tracer, backend = fake_tracer - tc = {"id": "c1", "name": "get_weather", "args": {"city": "Paris"}} - ai_msg = AIMessage(content="", tool_calls=[tc]) - tool_msg = ToolMessage(content="Sunny", tool_call_id="c1", name="get_weather") - - events_raw = [ - ("updates", {"agent": {"messages": [ai_msg]}}), - ("updates", {"tools": {"messages": [tool_msg]}}), - ] - - emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) - _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - - assert len(backend.spans_started) == 1, "Full(ToolRequestContent) opens one tool span" - started = backend.spans_started[0] - assert started["name"] == "get_weather" - assert started["input"] == {"city": "Paris"} - - async def test_no_spans_when_no_tool_calls(self, fake_tracer): - """yield_turn with tracer but no tool calls emits no spans.""" - from langchain_core.messages import AIMessage, AIMessageChunk - - tracer, backend = fake_tracer - chunk = AIMessageChunk(content="Hello!") - ai_msg = AIMessage(content="Hello!") - - events_raw = [ - ("messages", (chunk, {})), - ("updates", {"agent": {"messages": [ai_msg]}}), - ] - - emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) - _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - - assert backend.spans_started == [], "No tool spans when there are no tool calls" - - async def test_tracer_none_means_no_spans(self): - """With tracer=False, no spans should be emitted.""" - from langchain_core.messages import AIMessage, ToolMessage - - tc = {"id": "c1", "name": "t", "args": {}} - ai_msg = AIMessage(content="", tool_calls=[tc]) - tool_msg = ToolMessage(content="ok", tool_call_id="c1", name="t") - - events_raw = [ - ("updates", {"agent": {"messages": [ai_msg]}}), - ("updates", {"tools": {"messages": [tool_msg]}}), - ] - - emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=False) - _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] - # No assertion on spans since tracer=False means emitter.tracer is None - assert emitter.tracer is None diff --git a/tests/lib/core/harness/test_harness_langgraph_async.py b/tests/lib/core/harness/test_harness_langgraph_async.py index 39bf5bc66..09e92102b 100644 --- a/tests/lib/core/harness/test_harness_langgraph_async.py +++ b/tests/lib/core/harness/test_harness_langgraph_async.py @@ -13,10 +13,10 @@ -------------- - The async handler pushes the correct sequence of messages to the fake streaming backend: Full(ToolRequest) + Full(ToolResponse) + text Start/Delta/Done. -- final_text accumulates all text (not just last segment — AGX1-377 unified behavior). +- final_text accumulates all text (not just last segment — unified behavior). - Tool messages go through streaming_task_message_context (not messages.create). -- With a SpanTracer, no tool spans are produced (AGX1-377: Full events are not - handled by SpanDeriver today). +- With a SpanTracer, Full tool events produce tool spans (request opens, response + closes), aligning LangGraph tracing with the Start+Done harnesses. What is NOT covered without live infrastructure ----------------------------------------------- @@ -46,6 +46,8 @@ from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from ._fakes import FakeTracing + # --------------------------------------------------------------------------- # Remove conftest stubs so real langchain_core types are used # --------------------------------------------------------------------------- @@ -102,30 +104,6 @@ def streaming_task_message_context(self, task_id: str, initial_content: Any, **k return ctx -# --------------------------------------------------------------------------- -# Fake tracing backend -# --------------------------------------------------------------------------- - - -class _FakeSpan: - def __init__(self, name: str) -> None: - self.name = name - self.output: Any = None - - -class _FakeTracing: - def __init__(self) -> None: - self.started: list[tuple[str, Any]] = [] - self.ended: list[tuple[str, Any]] = [] - - async def start_span(self, *, trace_id: str, name: str, **kw: Any) -> _FakeSpan: - self.started.append((name, kw.get("parent_id"))) - return _FakeSpan(name) - - async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None: - self.ended.append((span.name, span.output)) - - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -142,9 +120,9 @@ async def _gen(): async def _run_auto_send_turn( stream_events: list[tuple[str, Any]], trace_id: str | None = None, -) -> tuple[TurnResult, _FakeStreaming, _FakeTracing | None]: +) -> tuple[TurnResult, _FakeStreaming, FakeTracing | None]: fake_streaming = _FakeStreaming() - fake_tracing = _FakeTracing() if trace_id else None + fake_tracing = FakeTracing() if trace_id else None tracer: SpanTracer | bool = False if trace_id and fake_tracing is not None: @@ -275,7 +253,7 @@ async def test_turn_usage_populated_after_events_consumed(self): assert usage.total_tokens == 15 async def test_tracer_produces_tool_spans_for_full_events(self): - """AGX1-377: SpanDeriver now handles Full tool events (request opens, response closes). + """SpanDeriver handles Full tool events (request opens, response closes). Full(ToolRequestContent) opens a tool span; Full(ToolResponseContent) closes it. This aligns LangGraph tracing with Start+Done harnesses (pydantic-ai, openai-agents). diff --git a/tests/lib/core/harness/test_harness_langgraph_sync.py b/tests/lib/core/harness/test_harness_langgraph_sync.py index 9f67dd2b6..67d213b6a 100644 --- a/tests/lib/core/harness/test_harness_langgraph_sync.py +++ b/tests/lib/core/harness/test_harness_langgraph_sync.py @@ -46,6 +46,8 @@ from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from ._fakes import FakeTracing + # --------------------------------------------------------------------------- # Remove conftest stubs so real langchain_core types are used # --------------------------------------------------------------------------- @@ -62,32 +64,6 @@ def _real_langchain_core(): sys.modules.update(saved) -# --------------------------------------------------------------------------- -# Fake tracing backend -# --------------------------------------------------------------------------- - - -class _FakeSpan: - def __init__(self, name: str) -> None: - self.name = name - self.output: Any = None - - -class _FakeTracing: - def __init__(self) -> None: - self.started: list[tuple[str, Any]] = [] - self.ended: list[tuple[str, Any]] = [] - - async def start_span( - self, *, trace_id: str, name: str, input: Any = None, parent_id: Any = None, **kw: Any - ) -> _FakeSpan: - self.started.append((name, parent_id)) - return _FakeSpan(name) - - async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None: - self.ended.append((span.name, span.output)) - - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -103,8 +79,8 @@ async def _gen(): async def _run_yield_turn( stream_events: list[tuple[str, Any]], trace_id: str | None = None -) -> tuple[list[Any], _FakeTracing | None]: - fake_tracing = _FakeTracing() if trace_id else None +) -> tuple[list[Any], FakeTracing | None]: + fake_tracing = FakeTracing() if trace_id else None tracer: SpanTracer | bool | None = None if trace_id and fake_tracing is not None: tracer = SpanTracer(trace_id=trace_id, parent_span_id=None, task_id="task1", tracing=fake_tracing) @@ -191,7 +167,7 @@ async def test_empty_stream_yields_nothing(self): assert out == [] async def test_tracer_produces_tool_spans_for_full_events(self): - """AGX1-377: SpanDeriver now handles Full tool events (request opens, response closes). + """SpanDeriver handles Full tool events (request opens, response closes). Full(ToolRequestContent) opens a tool span; Full(ToolResponseContent) closes it. This aligns LangGraph tracing with Start+Done harnesses (pydantic-ai, openai-agents). diff --git a/tests/lib/core/harness/test_harness_langgraph_temporal.py b/tests/lib/core/harness/test_harness_langgraph_temporal.py index 1a094a33c..219e92229 100644 --- a/tests/lib/core/harness/test_harness_langgraph_temporal.py +++ b/tests/lib/core/harness/test_harness_langgraph_temporal.py @@ -1,7 +1,7 @@ """Integration test: Temporal channel with a LangGraph agent. -The Temporal LangGraph agent pattern uses ``emit_langgraph_messages`` (from -``_langgraph_messages.py``) inside a Temporal activity. That module is not +The Temporal LangGraph agent pattern uses ``emit_langgraph_messages`` (now in +``_langgraph_sync.py``) inside a Temporal activity. That helper is not yet unified onto the harness surface (it has its own Redis-streaming code). This test file verifies the LangGraph Temporal agent's streaming behavior using @@ -43,8 +43,7 @@ from agentex.lib.core.harness.emitter import UnifiedEmitter from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent -from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn -from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn, stream_langgraph_events # --------------------------------------------------------------------------- # Remove conftest stubs so real langchain_core types are used