Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/agentex/lib/adk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events
from agentex.lib.adk._modules._langgraph_messages import emit_langgraph_messages
from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events
from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn
from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events
from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events
from agentex.lib.adk._modules._pydantic_ai_tracing import create_pydantic_ai_tracing_handler
from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn
from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events
from agentex.lib.adk._modules._claude_code_turn import (
ClaudeCodeTurn,
Expand Down Expand Up @@ -70,10 +72,12 @@
"stream_langgraph_events",
"emit_langgraph_messages",
"convert_langgraph_to_agentex_events",
"LangGraphTurn",
# Pydantic AI
"stream_pydantic_ai_events",
"convert_pydantic_ai_to_agentex_events",
"create_pydantic_ai_tracing_handler",
"PydanticAITurn",
# Claude Code
"convert_claude_code_to_agentex_events",
"ClaudeCodeTurn",
Expand Down
24 changes: 12 additions & 12 deletions src/agentex/lib/cli/templates/default-langgraph/project/acp.py.j2
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ if _litellm_key:
os.environ["OPENAI_API_KEY"] = _litellm_key

import agentex.lib.adk as adk
from agentex.lib.adk import create_langgraph_tracing_handler, stream_langgraph_events
from agentex.lib.core.harness import UnifiedEmitter
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.protocol.acp import SendEventParams, CancelTaskParams, CreateTaskParams
from agentex.lib.types.fastacp import AsyncACPConfig
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.adk import LangGraphTurn

from project.graph import create_graph

Expand Down Expand Up @@ -67,24 +68,23 @@ async def handle_task_event_send(params: SendEventParams):
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
callback = create_langgraph_tracing_handler(
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)

stream = graph.astream(
{"messages": [{"role": "user", "content": user_message}]},
config={
"configurable": {"thread_id": task_id},
"callbacks": [callback],
},
config={"configurable": {"thread_id": task_id}},
stream_mode=["messages", "updates"],
)

final_output = await stream_langgraph_events(stream, task_id)
turn = LangGraphTurn(stream, model=None)
emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)

result = await emitter.auto_send_turn(turn)

if turn_span:
turn_span.output = {"final_output": final_output}
turn_span.output = {"final_output": result.final_text}


@acp.on_task_create
Expand Down
23 changes: 11 additions & 12 deletions src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@ from dotenv import load_dotenv

load_dotenv()

from project.agent import create_agent
from project.agent import MODEL_NAME, create_agent
from pydantic_ai.run import AgentRunResultEvent
from pydantic_ai.messages import ModelMessagesTypeAdapter

import agentex.lib.adk as adk
from agentex.lib.adk import (
stream_pydantic_ai_events,
create_pydantic_ai_tracing_handler,
)
from agentex.protocol.acp import SendEventParams, CancelTaskParams, CreateTaskParams
from agentex.lib.core.harness import UnifiedEmitter
from agentex.lib.types.fastacp import AsyncACPConfig
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.utils.model_utils import BaseModel
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.adk import PydanticAITurn
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config

logger = make_logger(__name__)
Expand Down Expand Up @@ -125,15 +123,17 @@ async def handle_task_event_send(params: SendEventParams):
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
tracing_handler = create_pydantic_ai_tracing_handler(
# Construct the UnifiedEmitter from the ACP context so tracing is
# automatic and messages are auto-sent to the task stream (Redis).
emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
task_id=task_id,
)

# Wrap the pydantic-ai event stream so we can capture the final
# AgentRunResultEvent (which carries the full message list for the
# next turn) without changing the streaming-helper's signature.
# next turn) before forwarding events to the emitter.
captured_messages: list[Any] = []

async def tee_messages(upstream) -> AsyncIterator[Any]:
Expand All @@ -143,9 +143,8 @@ async def handle_task_event_send(params: SendEventParams):
yield event

async with agent.run_stream_events(user_message, message_history=previous_messages) as stream:
final_output = await stream_pydantic_ai_events(
tee_messages(stream), task_id, tracing_handler=tracing_handler
)
turn = PydanticAITurn(tee_messages(stream), model=MODEL_NAME)
result = await emitter.auto_send_turn(turn)

# Save the updated message history so the next turn picks up here.
if captured_messages:
Expand All @@ -158,7 +157,7 @@ async def handle_task_event_send(params: SendEventParams):
)

if turn_span:
turn_span.output = {"final_output": final_output}
turn_span.output = {"final_output": result.final_text}


@acp.on_task_cancel
Expand Down
22 changes: 11 additions & 11 deletions src/agentex/lib/cli/templates/sync-langgraph/project/acp.py.j2
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ tokens and tool calls from the LangGraph graph to the Agentex frontend.
from typing import AsyncGenerator

import agentex.lib.adk as adk
from agentex.lib.adk import create_langgraph_tracing_handler, convert_langgraph_to_agentex_events
from agentex.lib.core.harness import UnifiedEmitter
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.protocol.acp import SendMessageParams
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.adk import LangGraphTurn
from agentex.types.task_message_content import TaskMessageContent
from agentex.types.task_message_delta import TextDelta
from agentex.types.task_message_update import TaskMessageUpdate
Expand Down Expand Up @@ -72,22 +73,21 @@ async def handle_message_send(
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
callback = create_langgraph_tracing_handler(
trace_id=thread_id,
parent_span_id=turn_span.id if turn_span else None,
)

stream = graph.astream(
{"messages": [{"role": "user", "content": user_message}]},
config={
"configurable": {"thread_id": thread_id},
"callbacks": [callback],
},
config={"configurable": {"thread_id": thread_id}},
stream_mode=["messages", "updates"],
)

turn = LangGraphTurn(stream, model=None)
emitter = UnifiedEmitter(
task_id=thread_id,
trace_id=thread_id,
parent_span_id=turn_span.id if turn_span else None,
)

final_text = ""
async for event in convert_langgraph_to_agentex_events(stream):
async for event in emitter.yield_turn(turn):
# Accumulate text deltas for span output
delta = getattr(event, "delta", None)
if isinstance(delta, TextDelta) and delta.text_delta:
Expand Down
23 changes: 11 additions & 12 deletions src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@ from dotenv import load_dotenv

load_dotenv()

from project.agent import create_agent
from project.agent import MODEL_NAME, create_agent

import agentex.lib.adk as adk
from agentex.lib.adk import (
create_pydantic_ai_tracing_handler,
convert_pydantic_ai_to_agentex_events,
)
from agentex.protocol.acp import SendMessageParams
from agentex.lib.core.harness import UnifiedEmitter
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.types.task_message_update import TaskMessageUpdate
from agentex.types.task_message_content import TaskMessageContent
from agentex.lib.adk import PydanticAITurn
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config

logger = make_logger(__name__)
Expand Down Expand Up @@ -73,21 +71,22 @@ async def handle_message_send(
logger.info(f"Processing message for task {task_id}")

# Open a per-message turn span. Tool calls below nest underneath this
# span via the tracing handler's parent_span_id wiring.
# span via the emitter's parent_span_id wiring.
async with adk.tracing.span(
trace_id=task_id,
task_id=task_id,
name="message",
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
tracing_handler = create_pydantic_ai_tracing_handler(
# Construct the UnifiedEmitter from the ACP/streaming context so tracing
# is automatic: tool spans nest under this turn's span.
emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
task_id=task_id,
)
async with agent.run_stream_events(user_message) as stream:
async for event in convert_pydantic_ai_to_agentex_events(
stream, tracing_handler=tracing_handler
):
yield event
turn = PydanticAITurn(stream, model=MODEL_NAME)
async for ev in emitter.yield_turn(turn):
yield ev
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ moves into recorded activities.

Streaming back to Agentex happens via ``event_stream_handler``, which
receives Pydantic AI ``AgentStreamEvent``s from inside the model activity
and forwards them to Redis using the ``stream_pydantic_ai_events`` helper.
The ``task_id`` and tracing parent span ID are threaded into the handler
via ``deps``.
and forwards them through the unified harness surface
(``UnifiedEmitter.auto_send_turn`` + ``PydanticAITurn``). The ``task_id`` and
tracing parent span ID are threaded into the handler via ``deps``.
"""

from __future__ import annotations
Expand All @@ -27,10 +27,8 @@ from project.tools import get_weather
from pydantic_ai.messages import AgentStreamEvent
from pydantic_ai.durable_exec.temporal import TemporalAgent

from agentex.lib.adk import (
stream_pydantic_ai_events,
create_pydantic_ai_tracing_handler,
)
from agentex.lib.core.harness import UnifiedEmitter
from agentex.lib.adk import PydanticAITurn

# Swap this for any Pydantic AI-supported model identifier
# (e.g. "anthropic:claude-3-5-sonnet-latest", "openai:gpt-4o").
Expand Down Expand Up @@ -92,17 +90,18 @@ async def event_handler(
activity (not the workflow), it can freely make non-deterministic Redis
writes β€” including the tracing HTTP calls that record per-tool-call
spans under the workflow's per-turn span (when ``parent_span_id`` is set).

The UnifiedEmitter is constructed from ``deps`` (task_id + parent_span_id),
so tool spans nest under the workflow's per-turn span and messages auto-send
to the task stream.
"""
tracing_handler = create_pydantic_ai_tracing_handler(
emitter = UnifiedEmitter(
task_id=run_context.deps.task_id,
trace_id=run_context.deps.task_id,
parent_span_id=run_context.deps.parent_span_id,
task_id=run_context.deps.task_id,
)
await stream_pydantic_ai_events(
events,
run_context.deps.task_id,
tracing_handler=tracing_handler,
)
turn = PydanticAITurn(events, model=MODEL_NAME)
await emitter.auto_send_turn(turn)
Comment thread
greptile-apps[bot] marked this conversation as resolved.


# Construct the durable agent at module load time so that the
Expand Down
Loading