Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions examples/tutorials/00_sync/030_langgraph/README.md
Original file line number Diff line number Diff line change
@@ -1,43 +1,50 @@
# Tutorial 030: Sync LangGraph Agent
# Tutorial: Sync LangGraph Agent

This tutorial demonstrates how to build a **synchronous** LangGraph agent on AgentEx with:
- Tool calling (ReAct pattern)
- Streaming token output
- Multi-turn conversation memory via AgentEx checkpointer
- Tracing integration
This tutorial demonstrates how to build a **synchronous** LangGraph agent on AgentEx
using the **unified harness surface**:

## Graph Structure
```python
turn = LangGraphTurn(stream, model=None)
emitter = UnifiedEmitter(task_id=task_id, trace_id=task_id, ...)
async for event in emitter.yield_turn(turn):
yield event
```

![Graph](graph.png)
The `LangGraphTurn` + `UnifiedEmitter` path replaces calling the lower-level
``convert_langgraph_to_agentex_events`` helper directly.

## Key Concepts

### Sync ACP
The sync ACP model uses HTTP request/response for communication. The `@acp.on_message_send` handler receives a message and yields streaming events back to the client.
### Unified Harness

`LangGraphTurn` implements the `HarnessTurn` protocol: it wraps the raw
LangGraph `astream()` generator and exposes `events` (an async generator of
`TaskMessageUpdate`) and `usage()` (token counts captured from the final
`AIMessage`).

`UnifiedEmitter.yield_turn(turn)` iterates the turn's events and yields them
to the sync ACP handler unchanged. The same `LangGraphTurn` object can also be
passed to `UnifiedEmitter.auto_send_turn` in the async/temporal channels.

### LangGraph Integration
- **StateGraph**: Defines the agent's state machine with `AgentState` (message history)
- **ToolNode**: Automatically executes tool calls from the LLM
- **tools_condition**: Routes between tool execution and final response
- **Checkpointer**: Uses AgentEx's HTTP checkpointer for cross-request memory
### AGX1-377 Note

### Streaming
The agent streams tokens as they're generated using `convert_langgraph_to_agentex_events()`, which converts LangGraph's stream events into AgentEx `TaskMessageUpdate` events.
LangGraph emits tool requests as `StreamTaskMessageFull` events (from "updates"
node outputs). The `SpanDeriver` does not open tool spans from Full events
today; that gap is tracked in AGX1-373.

## Files

| File | Description |
|------|-------------|
| `project/acp.py` | ACP server and message handler |
| `project/graph.py` | LangGraph state graph definition |
| `project/acp.py` | ACP server using unified harness (LangGraphTurn + yield_turn) |
| `project/graph.py` | LangGraph state graph (weather example) |
| `project/tools.py` | Tool definitions (weather example) |
| `tests/test_agent.py` | Integration tests |
| `manifest.yaml` | Agent configuration |
| `manifest.yaml` | Agent configuration (name: s030-langgraph) |

## Running Locally

```bash
# From this directory
agentex agents run
```

Expand Down
Binary file not shown.
4 changes: 2 additions & 2 deletions examples/tutorials/00_sync/030_langgraph/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ local_development:
agent:
acp_type: sync
name: s030-langgraph
description: A sync LangGraph agent with tool calling and streaming
description: A sync LangGraph agent using the unified harness surface (LangGraphTurn + UnifiedEmitter.yield_turn)

temporal:
enabled: false
Expand Down Expand Up @@ -47,7 +47,7 @@ deployment:
global:
agent:
name: "s030-langgraph"
description: "A sync LangGraph agent with tool calling and streaming"
description: "A sync LangGraph agent using the unified harness surface"
replicaCount: 1
resources:
requests:
Expand Down
65 changes: 39 additions & 26 deletions examples/tutorials/00_sync/030_langgraph/project/acp.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
"""
ACP (Agent Communication Protocol) handler for Agentex.

This is the API layer — it manages the graph lifecycle and streams
tokens and tool calls from the LangGraph graph to the Agentex frontend.
"""ACP handler for the sync LangGraph agent.

Uses the unified harness surface: ``LangGraphTurn`` wraps the LangGraph
``astream()`` generator, and ``UnifiedEmitter.yield_turn`` converts it into
the AgentEx ``TaskMessageUpdate`` event stream expected by the sync ACP.

Properties of the unified surface:
- Tracing is wired through the tracing manager (no bespoke handler boilerplate).
- No manual text-delta accumulation for the span output.
- Tool calls are emitted as ``StreamTaskMessageFull`` (not Start+Delta+Done)
via the same code path as the async/temporal channels.
- Usage data (token counts) is captured on the ``LangGraphTurn`` object and
can be read after the turn completes.

AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull``
events (from "updates"). The ``SpanDeriver`` does not open tool spans from
Full events today; that gap is tracked in AGX1-373.
"""

from __future__ import annotations
Expand All @@ -16,29 +28,29 @@

import agentex.lib.adk as adk
from project.graph import create_graph
from agentex.lib.adk import create_langgraph_tracing_handler, convert_langgraph_to_agentex_events
from agentex.lib.types.acp import SendMessageParams
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.core.harness.emitter import UnifiedEmitter
from agentex.types.task_message_delta import TextDelta
from agentex.types.task_message_update import TaskMessageUpdate
from agentex.types.task_message_content import TaskMessageContent
from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config

logger = make_logger(__name__)

# Register the Agentex tracing processor so spans are shipped to the backend
add_tracing_processor_config(
SGPTracingProcessorConfig(
sgp_api_key=os.environ.get("SGP_API_KEY", ""),
sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""),
sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""),
))
# Create ACP server
)
)

acp = FastACP.create(acp_type="sync")

# Compiled graph (lazy-initialized on first request)
_graph = None


Expand All @@ -54,41 +66,42 @@ async def get_graph():
async def handle_message_send(
params: SendMessageParams,
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
"""Handle incoming messages from Agentex, streaming tokens and tool calls."""
"""Handle incoming messages, streaming tokens and tool calls via unified harness."""
graph = await get_graph()

thread_id = params.task.id
task_id = params.task.id
user_message = params.content.content

logger.info(f"Processing message for thread {thread_id}")
logger.info(f"Processing message for task {task_id}")

async with adk.tracing.span(
trace_id=thread_id,
trace_id=task_id,
task_id=task_id,
name="message",
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
callback = create_langgraph_tracing_handler(
trace_id=thread_id,
parent_span_id=turn_span.id if turn_span else None,
)

stream = graph.astream(
{"messages": [{"role": "user", "content": user_message}]},
config={
"configurable": {"thread_id": thread_id},
"callbacks": [callback],
},
config={"configurable": {"thread_id": task_id}},
stream_mode=["messages", "updates"],
)

turn = LangGraphTurn(stream, model=None)
emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)

final_text = ""
async for event in convert_langgraph_to_agentex_events(stream):
# Accumulate text deltas for span output
async for event in emitter.yield_turn(turn):
# Accumulate text deltas so the span's final_output is the assistant
# text (matching the async tutorial), not the usage metrics.
delta = getattr(event, "delta", None)
if isinstance(delta, TextDelta) and delta.text_delta:
final_text += delta.text_delta
yield event

if turn_span:
turn_span.output = {"final_output": final_text}
turn_span.output = {"final_output": final_text, "usage": turn.usage().model_dump()}
18 changes: 6 additions & 12 deletions examples/tutorials/00_sync/030_langgraph/project/graph.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""
LangGraph graph definition.
"""LangGraph graph definition for the 030_langgraph sync agent.

Defines the state, nodes, edges, and compiles the graph.
The compiled graph is the boundary between this module and the API layer.
Identical to ``030_langgraph/project/graph.py`` — the graph definition is not
affected by the harness migration. Only ``acp.py`` changes.
"""

from __future__ import annotations
Expand Down Expand Up @@ -35,15 +34,12 @@

class AgentState(TypedDict):
"""State schema for the agent graph."""

messages: Annotated[list[Any], add_messages]


async def create_graph():
"""Create and compile the agent graph with checkpointer.

Returns:
A compiled LangGraph StateGraph ready for invocation.
"""
"""Create and compile the agent graph with checkpointer."""
llm = ChatOpenAI(
model=MODEL_NAME,
reasoning={"effort": "high", "summary": "auto"},
Expand All @@ -56,9 +52,7 @@ def agent_node(state: AgentState) -> dict[str, Any]:
"""Process the current state and generate a response."""
messages = state["messages"]
if not messages or not isinstance(messages[0], SystemMessage):
system_content = SYSTEM_PROMPT.format(
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
system_content = SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
messages = [SystemMessage(content=system_content)] + messages
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
Expand Down
10 changes: 1 addition & 9 deletions examples/tutorials/00_sync/030_langgraph/project/tools.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
"""
Tool definitions for the LangGraph agent.

Add your custom tools here. Each tool should be a function decorated with @tool
or created using the Tool class.
"""
"""Tool definitions for the 030_langgraph sync agent."""

from langchain_core.tools import Tool

Expand All @@ -17,16 +12,13 @@ def get_weather(city: str) -> str:
Returns:
A string describing the weather conditions.
"""
# TODO: Replace with actual weather API call
return f"The weather in {city} is sunny and 72°F"


# Define tools
weather_tool = Tool(
name="get_weather",
func=get_weather,
description="Get the current weather for a city. Input should be a city name.",
)

# Export all tools as a list
TOOLS = [weather_tool]
2 changes: 1 addition & 1 deletion examples/tutorials/00_sync/030_langgraph/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "hatchling.build"
[project]
name = "s030-langgraph"
version = "0.1.0"
description = "A sync LangGraph agent with tool calling and streaming"
description = "A sync LangGraph agent using the unified harness surface"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
Expand Down
Loading
Loading