From 758d5f8d47bd124120894599a317da80a38140a3 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 09:51:04 -0400 Subject: [PATCH] refactor(cli): migrate existing langgraph/pydantic-ai templates to unified surface Update the scaffolded acp.py / agent.py for default-langgraph, default-pydantic-ai, sync-langgraph, sync-pydantic-ai and temporal-pydantic-ai to the canonical unified harness surface, matching the migrated tutorials. Promote LangGraphTurn and PydanticAITurn to the public agentex.lib.adk facade (parallel to ClaudeCodeTurn / CodexTurn) so the scaffolded user code imports them from the stable public path instead of a private _modules path. Non-breaking: template scaffolding + additive public exports only. The unified surface already exists; deprecated tracing handlers are removed in follow-ups. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/adk/__init__.py | 4 +++ .../default-langgraph/project/acp.py.j2 | 24 ++++++++--------- .../default-pydantic-ai/project/acp.py.j2 | 23 ++++++++-------- .../sync-langgraph/project/acp.py.j2 | 22 +++++++-------- .../sync-pydantic-ai/project/acp.py.j2 | 23 ++++++++-------- .../temporal-pydantic-ai/project/agent.py.j2 | 27 +++++++++---------- 6 files changed, 62 insertions(+), 61 deletions(-) diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index fedd52f7a..e618a20d3 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -10,9 +10,11 @@ from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events from agentex.lib.adk._modules._langgraph_messages import emit_langgraph_messages from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events from agentex.lib.adk._modules._pydantic_ai_tracing import create_pydantic_ai_tracing_handler +from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events from agentex.lib.adk._modules._claude_code_turn import ( ClaudeCodeTurn, @@ -70,10 +72,12 @@ "stream_langgraph_events", "emit_langgraph_messages", "convert_langgraph_to_agentex_events", + "LangGraphTurn", # Pydantic AI "stream_pydantic_ai_events", "convert_pydantic_ai_to_agentex_events", "create_pydantic_ai_tracing_handler", + "PydanticAITurn", # Claude Code "convert_claude_code_to_agentex_events", "ClaudeCodeTurn", diff --git a/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 b/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 index 3309dc07e..750a271ad 100644 --- a/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2 @@ -15,13 +15,14 @@ if _litellm_key: os.environ["OPENAI_API_KEY"] = _litellm_key import agentex.lib.adk as adk -from agentex.lib.adk import create_langgraph_tracing_handler, stream_langgraph_events +from agentex.lib.core.harness import UnifiedEmitter from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config from agentex.lib.sdk.fastacp.fastacp import FastACP from agentex.protocol.acp import SendEventParams, CancelTaskParams, CreateTaskParams from agentex.lib.types.fastacp import AsyncACPConfig from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger +from agentex.lib.adk import LangGraphTurn from project.graph import create_graph @@ -67,24 +68,23 @@ async def handle_task_event_send(params: SendEventParams): input={"message": user_message}, data={"__span_type__": "AGENT_WORKFLOW"}, ) as turn_span: - callback = create_langgraph_tracing_handler( - trace_id=task_id, - parent_span_id=turn_span.id if turn_span else None, - ) - stream = graph.astream( {"messages": [{"role": "user", "content": user_message}]}, - config={ - "configurable": {"thread_id": task_id}, - "callbacks": [callback], - }, + config={"configurable": {"thread_id": task_id}}, stream_mode=["messages", "updates"], ) - final_output = await stream_langgraph_events(stream, task_id) + turn = LangGraphTurn(stream, model=None) + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + result = await emitter.auto_send_turn(turn) if turn_span: - turn_span.output = {"final_output": final_output} + turn_span.output = {"final_output": result.final_text} @acp.on_task_create diff --git a/src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 b/src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 index 5692396b2..11d3ab476 100644 --- a/src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 @@ -19,21 +19,19 @@ from dotenv import load_dotenv load_dotenv() -from project.agent import create_agent +from project.agent import MODEL_NAME, create_agent from pydantic_ai.run import AgentRunResultEvent from pydantic_ai.messages import ModelMessagesTypeAdapter import agentex.lib.adk as adk -from agentex.lib.adk import ( - stream_pydantic_ai_events, - create_pydantic_ai_tracing_handler, -) from agentex.protocol.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.lib.core.harness import UnifiedEmitter from agentex.lib.types.fastacp import AsyncACPConfig from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.lib.utils.model_utils import BaseModel from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.adk import PydanticAITurn from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config logger = make_logger(__name__) @@ -125,15 +123,17 @@ async def handle_task_event_send(params: SendEventParams): input={"message": user_message}, data={"__span_type__": "AGENT_WORKFLOW"}, ) as turn_span: - tracing_handler = create_pydantic_ai_tracing_handler( + # Construct the UnifiedEmitter from the ACP context so tracing is + # automatic and messages are auto-sent to the task stream (Redis). + emitter = UnifiedEmitter( + task_id=task_id, trace_id=task_id, parent_span_id=turn_span.id if turn_span else None, - task_id=task_id, ) # Wrap the pydantic-ai event stream so we can capture the final # AgentRunResultEvent (which carries the full message list for the - # next turn) without changing the streaming-helper's signature. + # next turn) before forwarding events to the emitter. captured_messages: list[Any] = [] async def tee_messages(upstream) -> AsyncIterator[Any]: @@ -143,9 +143,8 @@ async def handle_task_event_send(params: SendEventParams): yield event async with agent.run_stream_events(user_message, message_history=previous_messages) as stream: - final_output = await stream_pydantic_ai_events( - tee_messages(stream), task_id, tracing_handler=tracing_handler - ) + turn = PydanticAITurn(tee_messages(stream), model=MODEL_NAME) + result = await emitter.auto_send_turn(turn) # Save the updated message history so the next turn picks up here. if captured_messages: @@ -158,7 +157,7 @@ async def handle_task_event_send(params: SendEventParams): ) if turn_span: - turn_span.output = {"final_output": final_output} + turn_span.output = {"final_output": result.final_text} @acp.on_task_cancel diff --git a/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 b/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 index 54538d0c9..c6814b9c4 100644 --- a/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2 @@ -8,12 +8,13 @@ tokens and tool calls from the LangGraph graph to the Agentex frontend. from typing import AsyncGenerator import agentex.lib.adk as adk -from agentex.lib.adk import create_langgraph_tracing_handler, convert_langgraph_to_agentex_events +from agentex.lib.core.harness import UnifiedEmitter from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config from agentex.lib.sdk.fastacp.fastacp import FastACP from agentex.protocol.acp import SendMessageParams from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger +from agentex.lib.adk import LangGraphTurn from agentex.types.task_message_content import TaskMessageContent from agentex.types.task_message_delta import TextDelta from agentex.types.task_message_update import TaskMessageUpdate @@ -72,22 +73,21 @@ async def handle_message_send( input={"message": user_message}, data={"__span_type__": "AGENT_WORKFLOW"}, ) as turn_span: - callback = create_langgraph_tracing_handler( - trace_id=thread_id, - parent_span_id=turn_span.id if turn_span else None, - ) - stream = graph.astream( {"messages": [{"role": "user", "content": user_message}]}, - config={ - "configurable": {"thread_id": thread_id}, - "callbacks": [callback], - }, + config={"configurable": {"thread_id": thread_id}}, stream_mode=["messages", "updates"], ) + turn = LangGraphTurn(stream, model=None) + emitter = UnifiedEmitter( + task_id=thread_id, + trace_id=thread_id, + parent_span_id=turn_span.id if turn_span else None, + ) + final_text = "" - async for event in convert_langgraph_to_agentex_events(stream): + async for event in emitter.yield_turn(turn): # Accumulate text deltas for span output delta = getattr(event, "delta", None) if isinstance(delta, TextDelta) and delta.text_delta: diff --git a/src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 b/src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 index 4925e847f..061ae0e08 100644 --- a/src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 +++ b/src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 @@ -15,19 +15,17 @@ from dotenv import load_dotenv load_dotenv() -from project.agent import create_agent +from project.agent import MODEL_NAME, create_agent import agentex.lib.adk as adk -from agentex.lib.adk import ( - create_pydantic_ai_tracing_handler, - convert_pydantic_ai_to_agentex_events, -) from agentex.protocol.acp import SendMessageParams +from agentex.lib.core.harness import UnifiedEmitter from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.lib.sdk.fastacp.fastacp import FastACP from agentex.types.task_message_update import TaskMessageUpdate from agentex.types.task_message_content import TaskMessageContent +from agentex.lib.adk import PydanticAITurn from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config logger = make_logger(__name__) @@ -73,7 +71,7 @@ async def handle_message_send( logger.info(f"Processing message for task {task_id}") # Open a per-message turn span. Tool calls below nest underneath this - # span via the tracing handler's parent_span_id wiring. + # span via the emitter's parent_span_id wiring. async with adk.tracing.span( trace_id=task_id, task_id=task_id, @@ -81,13 +79,14 @@ async def handle_message_send( input={"message": user_message}, data={"__span_type__": "AGENT_WORKFLOW"}, ) as turn_span: - tracing_handler = create_pydantic_ai_tracing_handler( + # Construct the UnifiedEmitter from the ACP/streaming context so tracing + # is automatic: tool spans nest under this turn's span. + emitter = UnifiedEmitter( + task_id=task_id, trace_id=task_id, parent_span_id=turn_span.id if turn_span else None, - task_id=task_id, ) async with agent.run_stream_events(user_message) as stream: - async for event in convert_pydantic_ai_to_agentex_events( - stream, tracing_handler=tracing_handler - ): - yield event + turn = PydanticAITurn(stream, model=MODEL_NAME) + async for ev in emitter.yield_turn(turn): + yield ev diff --git a/src/agentex/lib/cli/templates/temporal-pydantic-ai/project/agent.py.j2 b/src/agentex/lib/cli/templates/temporal-pydantic-ai/project/agent.py.j2 index 0aa958118..da97856ea 100644 --- a/src/agentex/lib/cli/templates/temporal-pydantic-ai/project/agent.py.j2 +++ b/src/agentex/lib/cli/templates/temporal-pydantic-ai/project/agent.py.j2 @@ -11,9 +11,9 @@ moves into recorded activities. Streaming back to Agentex happens via ``event_stream_handler``, which receives Pydantic AI ``AgentStreamEvent``s from inside the model activity -and forwards them to Redis using the ``stream_pydantic_ai_events`` helper. -The ``task_id`` and tracing parent span ID are threaded into the handler -via ``deps``. +and forwards them through the unified harness surface +(``UnifiedEmitter.auto_send_turn`` + ``PydanticAITurn``). The ``task_id`` and +tracing parent span ID are threaded into the handler via ``deps``. """ from __future__ import annotations @@ -27,10 +27,8 @@ from project.tools import get_weather from pydantic_ai.messages import AgentStreamEvent from pydantic_ai.durable_exec.temporal import TemporalAgent -from agentex.lib.adk import ( - stream_pydantic_ai_events, - create_pydantic_ai_tracing_handler, -) +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.adk import PydanticAITurn # Swap this for any Pydantic AI-supported model identifier # (e.g. "anthropic:claude-3-5-sonnet-latest", "openai:gpt-4o"). @@ -92,17 +90,18 @@ async def event_handler( activity (not the workflow), it can freely make non-deterministic Redis writes — including the tracing HTTP calls that record per-tool-call spans under the workflow's per-turn span (when ``parent_span_id`` is set). + + The UnifiedEmitter is constructed from ``deps`` (task_id + parent_span_id), + so tool spans nest under the workflow's per-turn span and messages auto-send + to the task stream. """ - tracing_handler = create_pydantic_ai_tracing_handler( + emitter = UnifiedEmitter( + task_id=run_context.deps.task_id, trace_id=run_context.deps.task_id, parent_span_id=run_context.deps.parent_span_id, - task_id=run_context.deps.task_id, - ) - await stream_pydantic_ai_events( - events, - run_context.deps.task_id, - tracing_handler=tracing_handler, ) + turn = PydanticAITurn(events, model=MODEL_NAME) + await emitter.auto_send_turn(turn) # Construct the durable agent at module load time so that the