Skip to content

Commit eee3320

Browse files
declan-scaleclaude
andcommitted
fix(claude-code): run Claude Code CLI in a Temporal activity, not workflow code
The temporal tutorial spawned the CLI subprocess directly in the workflow signal handler. Temporal runs signal-handler bodies on its deterministic sandbox event loop, which does not implement asyncio.create_subprocess_exec — so the worker crashed with NotImplementedError and no tool_request/response was ever produced. (Replay was never the issue; the sandboxed loop is.) Move the subprocess + ClaudeCodeTurn + UnifiedEmitter.auto_send_turn into a new run_claude_code_turn activity (project/activities.py), register it on the worker, and have the signal handler delegate via workflow.execute_activity. The activity returns final_text + session_id so multi-turn resume still works. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 80aabc8 commit eee3320

3 files changed

Lines changed: 182 additions & 114 deletions

File tree

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
"""Temporal activity for the Claude Code tutorial.
2+
3+
Subprocess spawning (and any other I/O) must run inside a Temporal *activity*,
4+
not in workflow code. Temporal runs workflow + signal-handler bodies on a
5+
deterministic sandbox event loop that does not implement ``subprocess_exec``
6+
(or threads / sockets), so spawning the CLI directly in the signal handler
7+
raises ``NotImplementedError``. This activity runs the Claude Code CLI, drives
8+
the ``ClaudeCodeTurn`` through ``UnifiedEmitter.auto_send_turn`` (the async
9+
Redis push path), and returns the turn result to the workflow.
10+
11+
The ``_spawn_claude`` async generator is an injectable seam: offline tests
12+
provide a fake that yields pre-recorded stdout lines so no real CLI runs.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
import asyncio
18+
from typing import Any, AsyncIterator
19+
from datetime import datetime
20+
21+
from temporalio import activity
22+
23+
from agentex.lib.adk import ClaudeCodeTurn
24+
from agentex.lib.core.harness import UnifiedEmitter
25+
from agentex.lib.utils.logging import make_logger
26+
from agentex.lib.utils.model_utils import BaseModel
27+
28+
logger = make_logger(__name__)
29+
30+
RUN_CLAUDE_CODE_TURN_ACTIVITY = "run_claude_code_turn"
31+
32+
33+
class RunClaudeCodeTurnParams(BaseModel):
34+
"""Arguments for one Claude Code turn run inside an activity."""
35+
36+
task_id: str
37+
prompt: str
38+
trace_id: str | None = None
39+
parent_span_id: str | None = None
40+
session_id: str | None = None
41+
created_at: datetime | None = None
42+
43+
44+
class RunClaudeCodeTurnResult(BaseModel):
45+
"""Result returned from the activity to the workflow."""
46+
47+
final_text: str
48+
session_id: str | None = None
49+
50+
51+
async def _spawn_claude(prompt: str, session_id: str | None = None) -> AsyncIterator[str]:
52+
"""Spawn ``claude -p --output-format stream-json`` locally and yield stdout lines.
53+
54+
Pass ``session_id`` to resume a previous Claude Code session (multi-turn
55+
memory via ``-r <session_id>``).
56+
57+
Injectable seam: tests monkeypatch this with a fake async iterator so no
58+
real CLI invocation is needed offline.
59+
"""
60+
cmd = [
61+
"claude",
62+
"-p",
63+
"--output-format",
64+
"stream-json",
65+
"--verbose",
66+
]
67+
if session_id:
68+
cmd.extend(["-r", session_id])
69+
70+
proc = await asyncio.create_subprocess_exec(
71+
*cmd,
72+
stdin=asyncio.subprocess.PIPE,
73+
stdout=asyncio.subprocess.PIPE,
74+
stderr=asyncio.subprocess.PIPE,
75+
)
76+
assert proc.stdout is not None
77+
assert proc.stdin is not None
78+
79+
proc.stdin.write(prompt.encode())
80+
proc.stdin.close()
81+
82+
# Drain stderr concurrently. With --verbose, Claude Code can write enough to
83+
# stderr to fill the OS pipe buffer; if we only read stdout, the CLI blocks
84+
# on its stderr write while we block reading stdout — a deadlock. A
85+
# background task keeps stderr flowing so stdout never stalls.
86+
async def _drain_stderr() -> None:
87+
assert proc.stderr is not None
88+
async for _ in proc.stderr:
89+
pass
90+
91+
stderr_task = asyncio.create_task(_drain_stderr())
92+
93+
try:
94+
buffer = ""
95+
async for chunk in proc.stdout:
96+
buffer += chunk.decode("utf-8", errors="replace")
97+
while "\n" in buffer:
98+
line, buffer = buffer.split("\n", 1)
99+
line = line.strip()
100+
if line:
101+
yield line
102+
103+
if buffer.strip():
104+
yield buffer.strip()
105+
106+
await proc.wait()
107+
finally:
108+
# Release the subprocess and stderr drain task even if the consumer
109+
# abandons the generator early (task cancellation / client disconnect):
110+
# cancel the drain task and terminate+reap the process if it is still
111+
# running, so neither is leaked.
112+
stderr_task.cancel()
113+
try:
114+
await stderr_task
115+
except asyncio.CancelledError:
116+
pass
117+
if proc.returncode is None:
118+
try:
119+
proc.terminate()
120+
except ProcessLookupError:
121+
pass
122+
await proc.wait()
123+
124+
125+
@activity.defn(name=RUN_CLAUDE_CODE_TURN_ACTIVITY)
126+
async def run_claude_code_turn(params: RunClaudeCodeTurnParams) -> dict[str, Any]:
127+
"""Run one Claude Code turn end-to-end and stream events to the task.
128+
129+
Runs in an activity (real asyncio loop) so subprocess I/O is permitted.
130+
"""
131+
emitter = UnifiedEmitter(
132+
task_id=params.task_id,
133+
trace_id=params.trace_id,
134+
parent_span_id=params.parent_span_id,
135+
)
136+
turn = ClaudeCodeTurn(_spawn_claude(params.prompt, session_id=params.session_id))
137+
result = await emitter.auto_send_turn(turn, created_at=params.created_at)
138+
139+
session_id: str | None = None
140+
if getattr(turn, "_result_envelope", None):
141+
session_id = turn._result_envelope.get("session_id")
142+
143+
return RunClaudeCodeTurnResult(final_text=result.final_text, session_id=session_id).model_dump()

examples/tutorials/10_async/10_temporal/140_claude_code/project/run_worker.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,15 @@
33
Run as a separate long-lived process alongside the ACP HTTP server. The
44
worker polls Temporal for workflow + activity tasks and executes them.
55
6-
Claude Code does not register custom activities here -- subprocess spawning
7-
happens directly in the workflow signal handler (workflow code) to keep
8-
the tutorial minimal. The built-in Agentex activities (state, messages,
9-
streaming, tracing) are registered via ``get_all_activities()``.
10-
11-
For production use, move the subprocess spawn into a custom activity so
12-
it gets Temporal's retry + timeout guarantees.
6+
The Claude Code CLI subprocess runs in the ``run_claude_code_turn`` activity
7+
(registered below alongside the built-in Agentex activities), because
8+
subprocess I/O is not permitted on the Temporal workflow event loop.
139
"""
1410

1511
import asyncio
1612

1713
from project.workflow import At140ClaudeCodeWorkflow
14+
from project.activities import run_claude_code_turn
1815
from agentex.lib.utils.debug import setup_debug_if_enabled
1916
from agentex.lib.utils.logging import make_logger
2017
from agentex.lib.environment_variables import EnvironmentVariables
@@ -35,7 +32,7 @@ async def main():
3532
worker = AgentexWorker(task_queue=task_queue_name)
3633

3734
await worker.run(
38-
activities=get_all_activities(),
35+
activities=[run_claude_code_turn, *get_all_activities()],
3936
workflow=At140ClaudeCodeWorkflow,
4037
)
4138

examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py

Lines changed: 34 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,32 @@
11
"""Temporal workflow for the Claude Code tutorial.
22
33
Holds conversation state (session_id for multi-turn resume) durably across
4-
crashes. Each user message triggers ``on_task_event_send``, which spawns the
5-
Claude Code CLI locally as an asyncio subprocess, wraps the stdout line
6-
stream in ``ClaudeCodeTurn``, and delivers the turn via
4+
crashes. Each user message triggers ``on_task_event_send``, which delegates the
5+
turn to the ``run_claude_code_turn`` activity. The activity spawns the Claude
6+
Code CLI, wraps its stdout in ``ClaudeCodeTurn``, and delivers the turn via
77
``UnifiedEmitter.auto_send_turn`` (the async Redis push path).
88
99
Note on subprocess inside Temporal
1010
------------------------------------
11-
Temporal activities, not workflow code, should do I/O. However, this tutorial
12-
executes the subprocess directly in the signal handler (workflow code) to keep
13-
the example minimal. For production use, move the subprocess spawn into a
14-
dedicated activity so it benefits from Temporal's retry and timeout guarantees.
15-
See ``examples/tutorials/10_async/10_temporal/030_custom_activities/`` for
16-
the activity pattern.
11+
Subprocess (and all other) I/O must run in a Temporal *activity*, never in
12+
workflow code. Temporal runs workflow + signal-handler bodies on a
13+
deterministic sandbox event loop that does not implement ``subprocess_exec``
14+
(spawning the CLI there raises ``NotImplementedError``). The activity also gets
15+
Temporal's retry + timeout guarantees. See
16+
``examples/tutorials/10_async/10_temporal/030_custom_activities/`` for the
17+
activity pattern.
1718
"""
1819

1920
from __future__ import annotations
2021

2122
import os
2223
import json
23-
import asyncio
24-
from typing import AsyncIterator
24+
from datetime import timedelta
2525

2626
from temporalio import workflow
2727

2828
from agentex.lib import adk
29-
from agentex.lib.adk import ClaudeCodeTurn
3029
from agentex.lib.types.acp import SendEventParams, CreateTaskParams
31-
from agentex.lib.core.harness import UnifiedEmitter
3230
from agentex.lib.types.tracing import SGPTracingProcessorConfig
3331
from agentex.lib.utils.logging import make_logger
3432
from agentex.types.text_content import TextContent
@@ -37,6 +35,9 @@
3735
from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow
3836
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
3937

38+
with workflow.unsafe.imports_passed_through():
39+
from project.activities import RunClaudeCodeTurnParams, run_claude_code_turn
40+
4041
add_tracing_processor_config(
4142
SGPTracingProcessorConfig(
4243
sgp_api_key=os.environ.get("SGP_API_KEY", ""),
@@ -55,80 +56,6 @@
5556
logger = make_logger(__name__)
5657

5758

58-
async def _spawn_claude(prompt: str, session_id: str | None = None) -> AsyncIterator[str]:
59-
"""Spawn ``claude -p --output-format stream-json`` locally and yield stdout lines.
60-
61-
Pass ``session_id`` to resume a previous Claude Code session (multi-turn
62-
memory via ``-r <session_id>``).
63-
64-
Injectable seam: tests monkeypatch this with a fake async iterator so no
65-
real CLI invocation is needed offline.
66-
"""
67-
cmd = [
68-
"claude",
69-
"-p",
70-
"--output-format",
71-
"stream-json",
72-
"--verbose",
73-
]
74-
if session_id:
75-
cmd.extend(["-r", session_id])
76-
77-
proc = await asyncio.create_subprocess_exec(
78-
*cmd,
79-
stdin=asyncio.subprocess.PIPE,
80-
stdout=asyncio.subprocess.PIPE,
81-
stderr=asyncio.subprocess.PIPE,
82-
)
83-
assert proc.stdout is not None
84-
assert proc.stdin is not None
85-
86-
proc.stdin.write(prompt.encode())
87-
proc.stdin.close()
88-
89-
# Drain stderr concurrently. With --verbose, Claude Code can write enough to
90-
# stderr to fill the OS pipe buffer; if we only read stdout, the CLI blocks
91-
# on its stderr write while we block reading stdout — a deadlock. A
92-
# background task keeps stderr flowing so stdout never stalls.
93-
async def _drain_stderr() -> None:
94-
assert proc.stderr is not None
95-
async for _ in proc.stderr:
96-
pass
97-
98-
stderr_task = asyncio.create_task(_drain_stderr())
99-
100-
try:
101-
buffer = ""
102-
async for chunk in proc.stdout:
103-
buffer += chunk.decode("utf-8", errors="replace")
104-
while "\n" in buffer:
105-
line, buffer = buffer.split("\n", 1)
106-
line = line.strip()
107-
if line:
108-
yield line
109-
110-
if buffer.strip():
111-
yield buffer.strip()
112-
113-
await proc.wait()
114-
finally:
115-
# Release the subprocess and stderr drain task even if the consumer
116-
# abandons the generator early (task cancellation / client disconnect):
117-
# cancel the drain task and terminate+reap the process if it is still
118-
# running, so neither is leaked.
119-
stderr_task.cancel()
120-
try:
121-
await stderr_task
122-
except asyncio.CancelledError:
123-
pass
124-
if proc.returncode is None:
125-
try:
126-
proc.terminate()
127-
except ProcessLookupError:
128-
pass
129-
await proc.wait()
130-
131-
13259
@workflow.defn(name=environment_variables.WORKFLOW_NAME)
13360
class At140ClaudeCodeWorkflow(BaseWorkflow):
13461
"""Temporal workflow that runs Claude Code locally for each user message.
@@ -161,29 +88,30 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
16188
name=f"Turn {self._turn_number}",
16289
input={"message": prompt},
16390
) as span:
164-
emitter = UnifiedEmitter(
165-
task_id=task_id,
166-
trace_id=task_id,
167-
parent_span_id=span.id if span else None,
91+
# Delegate the subprocess turn to an activity: subprocess I/O is not
92+
# permitted on the Temporal workflow event loop. The activity streams
93+
# events to the task and returns the final text + session_id.
94+
# workflow.now() gives a deterministic timestamp under replay.
95+
result = await workflow.execute_activity(
96+
run_claude_code_turn,
97+
RunClaudeCodeTurnParams(
98+
task_id=task_id,
99+
prompt=prompt,
100+
trace_id=task_id,
101+
parent_span_id=span.id if span else None,
102+
session_id=self._session_id,
103+
created_at=workflow.now(),
104+
),
105+
start_to_close_timeout=timedelta(minutes=5),
168106
)
169107

170-
# Use workflow.now() for deterministic timestamps under Temporal replay.
171-
created_at = workflow.now()
172-
173-
turn = ClaudeCodeTurn(_spawn_claude(prompt, session_id=self._session_id))
174-
result = await emitter.auto_send_turn(turn, created_at=created_at)
175-
176-
# Capture session_id from result envelope to enable resume on next turn.
177-
# ClaudeCodeTurn.usage() gives us access to the raw result envelope via
178-
# TurnUsage -- but session_id is not part of TurnUsage. We extract it
179-
# separately by looking at the turn's internal state post-exhaust.
180-
if hasattr(turn, "_result_envelope") and turn._result_envelope:
181-
sid = turn._result_envelope.get("session_id")
182-
if sid:
183-
self._session_id = sid
108+
# Capture session_id to enable Claude Code resume on the next turn.
109+
sid = result.get("session_id")
110+
if sid:
111+
self._session_id = sid
184112

185113
if span:
186-
span.output = {"final_text": result.final_text}
114+
span.output = {"final_text": result.get("final_text")}
187115

188116
@workflow.run
189117
async def on_task_create(self, params: CreateTaskParams) -> str:

0 commit comments

Comments
 (0)