|
| 1 | +"""Sync ACP handler for the Codex CLI harness tutorial. |
| 2 | +
|
| 3 | +Demonstrates the ``convert_codex_to_agentex_events`` tap + ``CodexTurn`` + |
| 4 | +``UnifiedEmitter`` for a sync (HTTP-yield) ACP agent. |
| 5 | +
|
| 6 | +The handler: |
| 7 | +1. Spawns ``codex exec --json`` as a LOCAL asyncio subprocess (no sandbox). |
| 8 | + This is correct for tutorials and local development; production isolation |
| 9 | + is handled by the golden agent's Scale sandbox at |
| 10 | + ``teams/sgp/agents/golden_agent/project/harness/providers/codex.py``. |
| 11 | +2. Wraps the stdout line stream in a ``CodexTurn``. |
| 12 | +3. Delivers every canonical ``StreamTaskMessage*`` event via |
| 13 | + ``UnifiedEmitter.yield_turn``, which traces + yields each event back to |
| 14 | + the HTTP caller in one pass. |
| 15 | +
|
| 16 | +Live runs require: |
| 17 | +- ``codex`` CLI on PATH (``npm install -g @openai/codex``) |
| 18 | +- ``OPENAI_API_KEY`` set in the environment |
| 19 | +""" |
| 20 | + |
| 21 | +from __future__ import annotations |
| 22 | + |
| 23 | +import os |
| 24 | +import time |
| 25 | +import asyncio |
| 26 | +from typing import AsyncGenerator |
| 27 | +from collections.abc import AsyncIterator |
| 28 | + |
| 29 | +from dotenv import load_dotenv |
| 30 | + |
| 31 | +load_dotenv() |
| 32 | + |
| 33 | +import agentex.lib.adk as adk |
| 34 | +from agentex.lib.adk import CodexTurn |
| 35 | +from agentex.lib.types.acp import SendMessageParams |
| 36 | +from agentex.lib.core.harness import UnifiedEmitter |
| 37 | +from agentex.lib.types.tracing import SGPTracingProcessorConfig |
| 38 | +from agentex.lib.utils.logging import make_logger |
| 39 | +from agentex.lib.sdk.fastacp.fastacp import FastACP |
| 40 | +from agentex.types.task_message_update import TaskMessageUpdate |
| 41 | +from agentex.types.task_message_content import TaskMessageContent |
| 42 | +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config |
| 43 | + |
| 44 | +logger = make_logger(__name__) |
| 45 | + |
| 46 | +add_tracing_processor_config( |
| 47 | + SGPTracingProcessorConfig( |
| 48 | + sgp_api_key=os.environ.get("SGP_API_KEY", ""), |
| 49 | + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), |
| 50 | + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), |
| 51 | + ) |
| 52 | +) |
| 53 | + |
| 54 | +acp = FastACP.create(acp_type="sync") |
| 55 | + |
| 56 | +MODEL = os.environ.get("CODEX_MODEL", "o4-mini") |
| 57 | + |
| 58 | + |
| 59 | +async def _spawn_codex(model: str) -> asyncio.subprocess.Process: |
| 60 | + """Spawn ``codex exec --json`` locally and return the live process. |
| 61 | +
|
| 62 | + Injection seam: tests replace this function with a fake that returns a |
| 63 | + mock process whose stdout yields pre-recorded event lines. |
| 64 | +
|
| 65 | + The flags mirror the golden agent (codex.py in the golden agent repo): |
| 66 | + --json machine-readable newline-delimited events |
| 67 | + --skip-git-repo-check safe to run outside a git repo |
| 68 | + --dangerously-bypass-approvals-and-sandbox |
| 69 | + skip interactive approval prompts in a |
| 70 | + non-interactive (server) context |
| 71 | + --model <model> which OpenAI model to use |
| 72 | +
|
| 73 | + The caller writes the prompt to stdin after the process starts, then |
| 74 | + closes stdin so codex knows input is complete. |
| 75 | + """ |
| 76 | + cmd = [ |
| 77 | + "codex", |
| 78 | + "exec", |
| 79 | + "--json", |
| 80 | + "--skip-git-repo-check", |
| 81 | + "--dangerously-bypass-approvals-and-sandbox", |
| 82 | + "--model", |
| 83 | + model, |
| 84 | + "-", # read prompt from stdin |
| 85 | + ] |
| 86 | + return await asyncio.create_subprocess_exec( |
| 87 | + *cmd, |
| 88 | + stdin=asyncio.subprocess.PIPE, |
| 89 | + stdout=asyncio.subprocess.PIPE, |
| 90 | + stderr=asyncio.subprocess.PIPE, |
| 91 | + env={**os.environ}, |
| 92 | + ) |
| 93 | + |
| 94 | + |
| 95 | +async def _process_stdout(process: asyncio.subprocess.Process) -> AsyncIterator[str]: |
| 96 | + """Yield newline-delimited JSON lines from the process stdout.""" |
| 97 | + assert process.stdout is not None |
| 98 | + buffer = "" |
| 99 | + while True: |
| 100 | + chunk = await process.stdout.read(4096) |
| 101 | + if not chunk: |
| 102 | + break |
| 103 | + buffer += chunk.decode("utf-8", errors="replace") |
| 104 | + while "\n" in buffer: |
| 105 | + line, buffer = buffer.split("\n", 1) |
| 106 | + line = line.strip() |
| 107 | + if line: |
| 108 | + yield line |
| 109 | + if buffer.strip(): |
| 110 | + yield buffer.strip() |
| 111 | + |
| 112 | + |
| 113 | +@acp.on_message_send |
| 114 | +async def handle_message_send( |
| 115 | + params: SendMessageParams, |
| 116 | +) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]: |
| 117 | + """Handle each message by running ``codex exec`` locally and streaming events.""" |
| 118 | + task_id = params.task.id |
| 119 | + user_message = params.content.content |
| 120 | + logger.info("Processing message for task %s", task_id) |
| 121 | + |
| 122 | + start_ms = int(time.monotonic() * 1000) |
| 123 | + |
| 124 | + async with adk.tracing.span( |
| 125 | + trace_id=task_id, |
| 126 | + task_id=task_id, |
| 127 | + name="message", |
| 128 | + input={"message": user_message}, |
| 129 | + data={"__span_type__": "AGENT_WORKFLOW"}, |
| 130 | + ) as turn_span: |
| 131 | + process = await _spawn_codex(MODEL) |
| 132 | + |
| 133 | + # Write prompt to stdin then close it so codex knows input is done. |
| 134 | + assert process.stdin is not None |
| 135 | + process.stdin.write(user_message.encode("utf-8")) |
| 136 | + await process.stdin.drain() |
| 137 | + process.stdin.close() |
| 138 | + |
| 139 | + duration_ms = int(time.monotonic() * 1000) - start_ms |
| 140 | + turn = CodexTurn( |
| 141 | + events=_process_stdout(process), |
| 142 | + model=MODEL, |
| 143 | + duration_ms=duration_ms, |
| 144 | + ) |
| 145 | + |
| 146 | + emitter = UnifiedEmitter( |
| 147 | + task_id=task_id, |
| 148 | + trace_id=task_id, |
| 149 | + parent_span_id=turn_span.id if turn_span else None, |
| 150 | + ) |
| 151 | + |
| 152 | + async for event in emitter.yield_turn(turn): |
| 153 | + yield event |
| 154 | + |
| 155 | + await process.wait() |
| 156 | + |
| 157 | + if turn_span: |
| 158 | + usage = turn.usage() |
| 159 | + turn_span.output = { |
| 160 | + "model": usage.model, |
| 161 | + "input_tokens": usage.input_tokens, |
| 162 | + "output_tokens": usage.output_tokens, |
| 163 | + } |
0 commit comments