diff --git a/src/agentex/lib/cli/commands/init.py b/src/agentex/lib/cli/commands/init.py index 63c16acc1..9849e9bbc 100644 --- a/src/agentex/lib/cli/commands/init.py +++ b/src/agentex/lib/cli/commands/init.py @@ -27,17 +27,20 @@ class TemplateType(str, Enum): TEMPORAL_PYDANTIC_AI = "temporal-pydantic-ai" TEMPORAL_LANGGRAPH = "temporal-langgraph" TEMPORAL_CLAUDE_CODE = "temporal-claude-code" + TEMPORAL_CODEX = "temporal-codex" DEFAULT = "default" DEFAULT_LANGGRAPH = "default-langgraph" DEFAULT_PYDANTIC_AI = "default-pydantic-ai" DEFAULT_OPENAI_AGENTS = "default-openai-agents" DEFAULT_CLAUDE_CODE = "default-claude-code" + DEFAULT_CODEX = "default-codex" SYNC = "sync" SYNC_OPENAI_AGENTS = "sync-openai-agents" SYNC_OPENAI_AGENTS_LOCAL_SANDBOX = "sync-openai-agents-local-sandbox" SYNC_LANGGRAPH = "sync-langgraph" SYNC_PYDANTIC_AI = "sync-pydantic-ai" SYNC_CLAUDE_CODE = "sync-claude-code" + SYNC_CODEX = "sync-codex" def render_template( @@ -71,17 +74,20 @@ def create_project_structure( TemplateType.TEMPORAL_PYDANTIC_AI: ["acp.py", "workflow.py", "run_worker.py", "agent.py", "tools.py"], TemplateType.TEMPORAL_LANGGRAPH: ["acp.py", "workflow.py", "run_worker.py", "graph.py", "tools.py"], TemplateType.TEMPORAL_CLAUDE_CODE: ["acp.py", "workflow.py", "run_worker.py", "activities.py"], + TemplateType.TEMPORAL_CODEX: ["acp.py", "workflow.py", "run_worker.py", "activities.py"], TemplateType.DEFAULT: ["acp.py"], TemplateType.DEFAULT_LANGGRAPH: ["acp.py", "graph.py", "tools.py"], TemplateType.DEFAULT_PYDANTIC_AI: ["acp.py", "agent.py", "tools.py"], TemplateType.DEFAULT_OPENAI_AGENTS: ["acp.py"], TemplateType.DEFAULT_CLAUDE_CODE: ["acp.py"], + TemplateType.DEFAULT_CODEX: ["acp.py"], TemplateType.SYNC: ["acp.py"], TemplateType.SYNC_OPENAI_AGENTS: ["acp.py"], TemplateType.SYNC_OPENAI_AGENTS_LOCAL_SANDBOX: ["acp.py", "agent.py", "tools.py"], TemplateType.SYNC_LANGGRAPH: ["acp.py", "graph.py", "tools.py"], TemplateType.SYNC_PYDANTIC_AI: ["acp.py", "agent.py", "tools.py"], TemplateType.SYNC_CLAUDE_CODE: ["acp.py"], + TemplateType.SYNC_CODEX: ["acp.py"], }[template_type] # Create project/code files @@ -196,6 +202,7 @@ def validate_agent_name(text: str) -> bool | str: {"name": "Async ACP + LangGraph", "value": TemplateType.DEFAULT_LANGGRAPH}, {"name": "Async ACP + Pydantic AI", "value": TemplateType.DEFAULT_PYDANTIC_AI}, {"name": "Async ACP + Claude Code", "value": TemplateType.DEFAULT_CLAUDE_CODE}, + {"name": "Async ACP + Codex", "value": TemplateType.DEFAULT_CODEX}, ], ).ask() if not template_type: @@ -209,6 +216,7 @@ def validate_agent_name(text: str) -> bool | str: {"name": "Temporal + Pydantic AI", "value": TemplateType.TEMPORAL_PYDANTIC_AI}, {"name": "Temporal + LangGraph", "value": TemplateType.TEMPORAL_LANGGRAPH}, {"name": "Temporal + Claude Code", "value": TemplateType.TEMPORAL_CLAUDE_CODE}, + {"name": "Temporal + Codex", "value": TemplateType.TEMPORAL_CODEX}, ], ).ask() if not template_type: @@ -223,6 +231,7 @@ def validate_agent_name(text: str) -> bool | str: {"name": "Sync ACP + LangGraph", "value": TemplateType.SYNC_LANGGRAPH}, {"name": "Sync ACP + Pydantic AI", "value": TemplateType.SYNC_PYDANTIC_AI}, {"name": "Sync ACP + Claude Code", "value": TemplateType.SYNC_CLAUDE_CODE}, + {"name": "Sync ACP + Codex", "value": TemplateType.SYNC_CODEX}, ], ).ask() if not template_type: diff --git a/src/agentex/lib/cli/templates/default-codex/.dockerignore.j2 b/src/agentex/lib/cli/templates/default-codex/.dockerignore.j2 new file mode 100644 index 000000000..c2d7fca4d --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/.dockerignore.j2 @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/src/agentex/lib/cli/templates/default-codex/.env.example.j2 b/src/agentex/lib/cli/templates/default-codex/.env.example.j2 new file mode 100644 index 000000000..5d621a83e --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/.env.example.j2 @@ -0,0 +1,13 @@ +# {{ agent_name }} - Environment Variables +# Copy this file to .env and fill in the values + +# API key used by the codex CLI (`codex exec` reads OPENAI_API_KEY directly) +OPENAI_API_KEY= + +# LLM base URL (optional - override to use a different provider) +# OPENAI_BASE_URL= + +# SGP Configuration (optional - for tracing) +# SGP_API_KEY= +# SGP_ACCOUNT_ID= +# SGP_CLIENT_BASE_URL= diff --git a/src/agentex/lib/cli/templates/default-codex/Dockerfile-uv.j2 b/src/agentex/lib/cli/templates/default-codex/Dockerfile-uv.j2 new file mode 100644 index 000000000..dffe96519 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/Dockerfile-uv.j2 @@ -0,0 +1,51 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install the codex CLI: the agent shells out to `codex` on every turn, so the +# binary must be present in the runtime image. +RUN npm install -g @openai/codex + +ENV UV_COMPILE_BYTECODE=1 +ENV UV_LINK_MODE=copy +ENV UV_HTTP_TIMEOUT=1000 + +WORKDIR /app/{{ project_path_from_build_root }} + +# Copy dependency files for layer caching +COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./ + +# Install dependencies (without project itself, for layer caching) +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-install-project --no-dev + +# Copy the project code +COPY {{ project_path_from_build_root }}/project ./project + +# Install the project +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-dev + +ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH" +ENV PYTHONPATH=/app + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/default-codex/Dockerfile.j2 b/src/agentex/lib/cli/templates/default-codex/Dockerfile.j2 new file mode 100644 index 000000000..1a8eb1484 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/Dockerfile.j2 @@ -0,0 +1,46 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Install the codex CLI: the agent shells out to `codex` on every turn, so the +# binary must be present in the runtime image. +RUN npm install -g @openai/codex + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the requirements file to optimize caching +COPY {{ project_path_from_build_root }}/requirements.txt /app/{{ project_path_from_build_root }}/requirements.txt + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages +RUN uv pip install --system -r requirements.txt + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + +# Set environment variables +ENV PYTHONPATH=/app + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/default-codex/README.md.j2 b/src/agentex/lib/cli/templates/default-codex/README.md.j2 new file mode 100644 index 000000000..b82f1c5f2 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/README.md.j2 @@ -0,0 +1,72 @@ +# {{ agent_name }} - AgentEx Async Codex Agent + +This template builds an **asynchronous** (non-Temporal) agent that drives the +**Codex CLI** through the unified harness surface on AgentEx: +- Spawns `codex exec --json` as a local subprocess +- Wraps the CLI's stdout stream in a `CodexTurn` +- Delivers canonical `StreamTaskMessage*` events via `UnifiedEmitter.auto_send_turn` + (the async Redis push path), so the UI receives output in real time +- Persists the codex session/thread ID via `adk.state` for multi-turn memory +- Tracing integration to SGP / AgentEx + +## Prerequisites + +- The `codex` CLI installed and on your `PATH` (`npm install -g @openai/codex`) +- An `OPENAI_API_KEY` in your environment + +## Running the Agent + +```bash +agentex agents run --manifest manifest.yaml +``` + +## Project Structure + +``` +{{ project_name }}/ +├── project/ +│ ├── __init__.py +│ └── acp.py # ACP server, subprocess spawn, state, and event handlers +├── Dockerfile +├── manifest.yaml +├── dev.ipynb +{% if use_uv %} +└── pyproject.toml +{% else %} +└── requirements.txt +{% endif %} +``` + +## Key Concepts + +### Async ACP with the harness +The async ACP model streams events over Redis instead of an HTTP response. The +`@acp.on_task_event_send` handler spawns the Codex CLI and pushes the harness +events to the task stream. + +### Multi-turn memory +The codex session/thread ID is persisted via `adk.state`, so each new turn +resumes the same codex session with `codex exec resume `. + +### The unified harness surface +`CodexTurn` + `UnifiedEmitter` are the unified harness surface. The turn +normalizes CLI output into canonical AgentEx events; the emitter traces and +delivers them. + +## Development + +### 1. Choose a model +Set `CODEX_MODEL` (defaults to `o4-mini`) to control which model codex uses. + +### 2. Customize the subprocess +Edit `_spawn_codex` in `project/acp.py` to change the CLI flags or how the +prompt is delivered. + +### 3. Configure Credentials +Set your credentials via `manifest.yaml`, an exported environment variable, or a +`.env` file in the project directory. + +### 4. Run Locally +```bash +export ENVIRONMENT=development && agentex agents run --manifest manifest.yaml +``` diff --git a/src/agentex/lib/cli/templates/default-codex/dev.ipynb.j2 b/src/agentex/lib/cli/templates/default-codex/dev.ipynb.j2 new file mode 100644 index 000000000..d3a68303f --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/dev.ipynb.j2 @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"{{ agent_name }}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# (REQUIRED) Create a new task. For Async agents, you must create a task for messages to be associated with.\n", + "import uuid\n", + "\n", + "rpc_response = client.agents.create_task(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"name\": f\"{str(uuid.uuid4())[:8]}-task\",\n", + " \"params\": {}\n", + " }\n", + ")\n", + "\n", + "task = rpc_response.result\n", + "print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Send an event to the agent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_event(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"task_id\": task.id,\n", + " }\n", + ")\n", + "\n", + "event = rpc_response.result\n", + "print(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6927cc0", + "metadata": {}, + "outputs": [], + "source": [ + "# Subscribe to the async task messages produced by the agent\n", + "from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages\n", + "\n", + "task_messages = subscribe_to_async_task_messages(\n", + " client=client,\n", + " task=task, \n", + " only_after_timestamp=event.created_at, \n", + " print_messages=True,\n", + " rich_print=True,\n", + " timeout=5,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4864e354", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/agentex/lib/cli/templates/default-codex/environments.yaml.j2 b/src/agentex/lib/cli/templates/default-codex/environments.yaml.j2 new file mode 100644 index 000000000..f802776f0 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/environments.yaml.j2 @@ -0,0 +1,57 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-{{agent_name}}" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal: + enabled: false + + diff --git a/src/agentex/lib/cli/templates/default-codex/manifest.yaml.j2 b/src/agentex/lib/cli/templates/default-codex/manifest.yaml.j2 new file mode 100644 index 000000000..aef2fcb5f --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/manifest.yaml.j2 @@ -0,0 +1,123 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Root directory for the build context + root: ../ # Keep this as the default root + + # Paths to include in the Docker build context + # Must include: + # - Your agent's directory (your custom agent code) + # These paths are collected and sent to the Docker daemon for building + include_paths: + - {{ project_path_from_build_root }} + + # Path to your agent's Dockerfile + # This defines how your agent's image is built from the context + # Relative to the root directory + dockerfile: {{ project_path_from_build_root }}/Dockerfile + + # Path to your agent's .dockerignore + # Filters unnecessary files from the build context + # Helps keep build context small and builds fast + dockerignore: {{ project_path_from_build_root }}/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + +# Agent Configuration +# ----------------- +agent: + acp_type: async + + # Unique name for your agent + # Used for task routing and monitoring + name: {{ agent_name }} + + # Description of what your agent does + # Helps with documentation and discovery + description: {{ description }} + + # Temporal workflow configuration + # Set enabled: true to use Temporal workflows for long-running tasks + temporal: + enabled: false + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + # The codex CLI (`codex exec`) reads OPENAI_API_KEY directly; it does not + # use a LiteLLM key. + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on. OPENAI_API_KEY is supplied via the credential + # mapping above (deploy) or your local .env. Do NOT set it to an empty string + # here — that would shadow the real key at runtime. + env: {} + # OPENAI_BASE_URL: "" + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret names + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/default-codex/project/acp.py.j2 b/src/agentex/lib/cli/templates/default-codex/project/acp.py.j2 new file mode 100644 index 000000000..f3fd91104 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/project/acp.py.j2 @@ -0,0 +1,232 @@ +"""Async (base) ACP handler for {{ agent_name }} — a Codex CLI harness agent. + +Demonstrates the ``convert_codex_to_agentex_events`` tap + ``CodexTurn`` + +``UnifiedEmitter`` for an async (Redis-streaming) ACP agent without Temporal. + +The handler: +1. Spawns ``codex exec --json`` as a LOCAL asyncio subprocess (no sandbox). + This is correct for local development; production isolation is a separate + concern. +2. Wraps the stdout line stream in a ``CodexTurn``. +3. Delivers every canonical ``StreamTaskMessage*`` event to Redis via + ``UnifiedEmitter.auto_send_turn``, so the UI receives tokens in real time. +4. Multi-turn memory is persisted via ``adk.state``. + +Live runs require: +- ``codex`` CLI on PATH (``npm install -g @openai/codex``) +- ``OPENAI_API_KEY`` set in the environment +""" + +from __future__ import annotations + +import os +import time +import codecs +import asyncio +from collections.abc import AsyncIterator + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from agentex.lib.adk import CodexTurn +from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.types.fastacp import AsyncACPConfig +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.utils.model_utils import BaseModel +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create( + acp_type="async", + config=AsyncACPConfig(type="base"), +) + +MODEL = os.environ.get("CODEX_MODEL", "o4-mini") + + +class ConversationState(BaseModel): + """Per-task conversation state persisted via ``adk.state``. + + We store the codex session/thread ID so subsequent turns can resume the + same codex session via ``codex exec resume ``. + """ + + codex_thread_id: str | None = None + turn_number: int = 0 + + +async def _spawn_codex( + model: str, + thread_id: str | None = None, +) -> asyncio.subprocess.Process: + """Spawn ``codex exec --json`` locally and return the live process. + + Injection seam: tests replace this function with a fake that returns a + mock process whose stdout yields pre-recorded event lines. + + When ``thread_id`` is provided the subcommand becomes + ``codex exec ... resume -`` so codex continues the prior + conversation thread. + + The caller writes the prompt to stdin after the process starts, then + closes stdin so codex knows input is complete. + """ + base_flags = [ + "--json", + "--skip-git-repo-check", + "--dangerously-bypass-approvals-and-sandbox", + "--model", + model, + ] + + if thread_id: + cmd = ["codex", "exec", *base_flags, "resume", thread_id, "-"] + else: + cmd = ["codex", "exec", *base_flags, "-"] + + return await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + # Discard stderr: codex --json writes events to stdout; its stderr is + # progress/debug noise. Capturing it with PIPE but never reading it + # would deadlock once codex fills the OS pipe buffer (~64 KB). + stderr=asyncio.subprocess.DEVNULL, + env={**os.environ}, + ) + + +async def _process_stdout(process: asyncio.subprocess.Process) -> AsyncIterator[str]: + """Yield newline-delimited JSON lines from the process stdout. + + Uses an incremental UTF-8 decoder so a multibyte character split across two + 4 KB reads is decoded correctly instead of being corrupted at the boundary. + """ + assert process.stdout is not None + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + buffer = "" + while True: + chunk = await process.stdout.read(4096) + if not chunk: + break + buffer += decoder.decode(chunk) + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + buffer += decoder.decode(b"", final=True) + if buffer.strip(): + yield buffer.strip() + + +@acp.on_task_create +async def handle_task_create(params: CreateTaskParams): + """Initialize per-task state on task creation.""" + logger.info("Task created: %s", params.task.id) + await adk.state.create( + task_id=params.task.id, + agent_id=params.agent.id, + state=ConversationState(), + ) + + +@acp.on_task_event_send +async def handle_task_event_send(params: SendEventParams): + """Handle each user message: spawn codex, stream events, save thread ID.""" + task_id = params.task.id + agent_id = params.agent.id + user_message = params.event.content.content + + logger.info("Processing message for task %s", task_id) + + await adk.messages.create(task_id=task_id, content=params.event.content) + + task_state = await adk.state.get_by_task_and_agent(task_id=task_id, agent_id=agent_id) + if task_state is None: + state = ConversationState() + task_state = await adk.state.create(task_id=task_id, agent_id=agent_id, state=state) + else: + state = ConversationState.model_validate(task_state.state) + + state.turn_number += 1 + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name=f"Turn {state.turn_number}", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + start_ms = int(time.monotonic() * 1000) + + process = await _spawn_codex(MODEL, thread_id=state.codex_thread_id) + + assert process.stdin is not None + process.stdin.write(user_message.encode("utf-8")) + await process.stdin.drain() + process.stdin.close() + + turn = CodexTurn( + events=_process_stdout(process), + model=MODEL, + ) + + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + # Guarantee the subprocess is reaped even if auto_send_turn raises + # (e.g. a Redis error); otherwise codex stays blocked writing to a full + # stdout pipe buffer and the OS process leaks until the server restarts. + try: + result = await emitter.auto_send_turn(turn) + finally: + if process.returncode is None: + process.kill() + await process.wait() + + # Record the real wall-clock duration AFTER streaming completes; setting + # it before the stream ran would capture only subprocess spawn overhead. + turn.duration_ms = int(time.monotonic() * 1000) - start_ms + + usage = turn.usage() + + # Persist the codex session id (public accessor; valid post-stream) so the + # next turn resumes the same session. + if turn.session_id: + state.codex_thread_id = turn.session_id + + await adk.state.update( + state_id=task_state.id, + task_id=task_id, + agent_id=agent_id, + state=state, + ) + + if turn_span: + turn_span.output = { + "final_text": result.final_text, + "model": usage.model, + } + + +@acp.on_task_cancel +async def handle_task_canceled(params: CancelTaskParams): + logger.info("Task canceled: %s", params.task.id) diff --git a/src/agentex/lib/cli/templates/default-codex/pyproject.toml.j2 b/src/agentex/lib/cli/templates/default-codex/pyproject.toml.j2 new file mode 100644 index 000000000..e499b1dc1 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/pyproject.toml.j2 @@ -0,0 +1,33 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "{{ project_name }}" +version = "0.1.0" +description = "{{ description }}" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "python-dotenv>=1.0,<2", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/src/agentex/lib/cli/templates/default-codex/requirements.txt.j2 b/src/agentex/lib/cli/templates/default-codex/requirements.txt.j2 new file mode 100644 index 000000000..8c0630384 --- /dev/null +++ b/src/agentex/lib/cli/templates/default-codex/requirements.txt.j2 @@ -0,0 +1,8 @@ +# Install agentex-sdk from local path +agentex-sdk + +# Scale GenAI Platform Python SDK +scale-gp + +# Loads .env files for local development +python-dotenv>=1.0,<2 diff --git a/src/agentex/lib/cli/templates/sync-codex/.dockerignore.j2 b/src/agentex/lib/cli/templates/sync-codex/.dockerignore.j2 new file mode 100644 index 000000000..c2d7fca4d --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/.dockerignore.j2 @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/src/agentex/lib/cli/templates/sync-codex/.env.example.j2 b/src/agentex/lib/cli/templates/sync-codex/.env.example.j2 new file mode 100644 index 000000000..5d621a83e --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/.env.example.j2 @@ -0,0 +1,13 @@ +# {{ agent_name }} - Environment Variables +# Copy this file to .env and fill in the values + +# API key used by the codex CLI (`codex exec` reads OPENAI_API_KEY directly) +OPENAI_API_KEY= + +# LLM base URL (optional - override to use a different provider) +# OPENAI_BASE_URL= + +# SGP Configuration (optional - for tracing) +# SGP_API_KEY= +# SGP_ACCOUNT_ID= +# SGP_CLIENT_BASE_URL= diff --git a/src/agentex/lib/cli/templates/sync-codex/Dockerfile-uv.j2 b/src/agentex/lib/cli/templates/sync-codex/Dockerfile-uv.j2 new file mode 100644 index 000000000..dffe96519 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/Dockerfile-uv.j2 @@ -0,0 +1,51 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install the codex CLI: the agent shells out to `codex` on every turn, so the +# binary must be present in the runtime image. +RUN npm install -g @openai/codex + +ENV UV_COMPILE_BYTECODE=1 +ENV UV_LINK_MODE=copy +ENV UV_HTTP_TIMEOUT=1000 + +WORKDIR /app/{{ project_path_from_build_root }} + +# Copy dependency files for layer caching +COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./ + +# Install dependencies (without project itself, for layer caching) +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-install-project --no-dev + +# Copy the project code +COPY {{ project_path_from_build_root }}/project ./project + +# Install the project +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-dev + +ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH" +ENV PYTHONPATH=/app + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/sync-codex/Dockerfile.j2 b/src/agentex/lib/cli/templates/sync-codex/Dockerfile.j2 new file mode 100644 index 000000000..afa4470d9 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/Dockerfile.j2 @@ -0,0 +1,47 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Install the codex CLI: the agent shells out to `codex` on every turn, so the +# binary must be present in the runtime image. +RUN npm install -g @openai/codex + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the requirements file to optimize caching +COPY {{ project_path_from_build_root }}/requirements.txt /app/{{ project_path_from_build_root }}/requirements.txt + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages +RUN uv pip install --system -r requirements.txt + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + + +# Set environment variables +ENV PYTHONPATH=/app + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/sync-codex/README.md.j2 b/src/agentex/lib/cli/templates/sync-codex/README.md.j2 new file mode 100644 index 000000000..4ca1aeccf --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/README.md.j2 @@ -0,0 +1,67 @@ +# {{ agent_name }} - AgentEx Sync Codex Agent + +This template builds a **synchronous** agent that drives the **Codex CLI** +through the unified harness surface on AgentEx: +- Spawns `codex exec --json` as a local subprocess +- Wraps the CLI's stdout stream in a `CodexTurn` +- Delivers canonical `StreamTaskMessage*` events via `UnifiedEmitter.yield_turn` + (the sync HTTP yield path) +- Tracing integration to SGP / AgentEx + +## Prerequisites + +- The `codex` CLI installed and on your `PATH` (`npm install -g @openai/codex`) +- An `OPENAI_API_KEY` in your environment + +## Running the Agent + +```bash +agentex agents run --manifest manifest.yaml +``` + +## Project Structure + +``` +{{ project_name }}/ +├── project/ +│ ├── __init__.py +│ └── acp.py # ACP server, subprocess spawn, and message handler +├── Dockerfile +├── manifest.yaml +├── dev.ipynb +{% if use_uv %} +└── pyproject.toml +{% else %} +└── requirements.txt +{% endif %} +``` + +## Key Concepts + +### Sync ACP with the harness +The sync ACP model uses HTTP request/response. The `@acp.on_message_send` +handler spawns the Codex CLI and yields the harness events back to the client +as they arrive. + +### The unified harness surface +`CodexTurn` + `UnifiedEmitter` are the unified harness surface. The turn +normalizes CLI output into canonical AgentEx events; the emitter traces and +delivers them. + +## Development + +### 1. Choose a model +Set `CODEX_MODEL` (defaults to `o4-mini`) to control which model codex uses. + +### 2. Customize the subprocess +Edit `_spawn_codex` in `project/acp.py` to change the CLI flags or how the +prompt is delivered. + +### 3. Configure Credentials +Set your credentials via `manifest.yaml`, an exported environment variable, or a +`.env` file in the project directory. + +### 4. Run Locally +```bash +export ENVIRONMENT=development && agentex agents run --manifest manifest.yaml +``` diff --git a/src/agentex/lib/cli/templates/sync-codex/dev.ipynb.j2 b/src/agentex/lib/cli/templates/sync-codex/dev.ipynb.j2 new file mode 100644 index 000000000..b0691b1b1 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/dev.ipynb.j2 @@ -0,0 +1,167 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"{{ agent_name }}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# # (Optional) Create a new task. If you don't create a new task, each message will be sent to a new task. The server will create the task for you.\n", + "\n", + "# import uuid\n", + "\n", + "# TASK_ID = str(uuid.uuid4())[:8]\n", + "\n", + "# rpc_response = client.agents.rpc_by_name(\n", + "# agent_name=AGENT_NAME,\n", + "# method=\"task/create\",\n", + "# params={\n", + "# \"name\": f\"{TASK_ID}-task\",\n", + "# \"params\": {}\n", + "# }\n", + "# )\n", + "\n", + "# task = rpc_response.result\n", + "# print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Test non streaming response\n", + "from agentex.types import TextContent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_message(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"stream\": False\n", + " }\n", + ")\n", + "\n", + "if not rpc_response or not rpc_response.result:\n", + " raise ValueError(\"No result in response\")\n", + "\n", + "# Extract and print just the text content from the response\n", + "for task_message in rpc_response.result:\n", + " content = task_message.content\n", + " if isinstance(content, TextContent):\n", + " text = content.content\n", + " print(text)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "79688331", + "metadata": {}, + "outputs": [], + "source": [ + "# Test streaming response\n", + "from agentex.types.task_message_update import StreamTaskMessageDelta, StreamTaskMessageFull\n", + "from agentex.types.text_delta import TextDelta\n", + "\n", + "\n", + "# The result object of message/send will be a TaskMessageUpdate which is a union of the following types:\n", + "# - StreamTaskMessageStart: \n", + "# - An indicator that a streaming message was started, doesn't contain any useful content\n", + "# - StreamTaskMessageDelta: \n", + "# - A delta of a streaming message, contains the text delta to aggregate\n", + "# - StreamTaskMessageDone: \n", + "# - An indicator that a streaming message was done, doesn't contain any useful content\n", + "# - StreamTaskMessageFull: \n", + "# - A non-streaming message, there is nothing to aggregate, since this contains the full message, not deltas\n", + "\n", + "# Whenn processing StreamTaskMessageDelta, if you are expecting more than TextDeltas, such as DataDelta, ToolRequestDelta, or ToolResponseDelta, you can process them as well\n", + "# Whenn processing StreamTaskMessageFull, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "for agent_rpc_response_chunk in client.agents.send_message_stream(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"stream\": True\n", + " }\n", + "):\n", + " # We know that the result of the message/send when stream is set to True will be a TaskMessageUpdate\n", + " task_message_update = agent_rpc_response_chunk.result\n", + " # Print oly the text deltas as they arrive or any full messages\n", + " if isinstance(task_message_update, StreamTaskMessageDelta):\n", + " delta = task_message_update.delta\n", + " if isinstance(delta, TextDelta):\n", + " print(delta.text_delta, end=\"\", flush=True)\n", + " else:\n", + " print(f\"Found non-text {type(task_message_update)} object in streaming message.\")\n", + " elif isinstance(task_message_update, StreamTaskMessageFull):\n", + " content = task_message_update.content\n", + " if isinstance(content, TextContent):\n", + " print(content.content)\n", + " else:\n", + " print(f\"Found non-text {type(task_message_update)} object in full message.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5e7e042", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/agentex/lib/cli/templates/sync-codex/environments.yaml.j2 b/src/agentex/lib/cli/templates/sync-codex/environments.yaml.j2 new file mode 100644 index 000000000..73924abdd --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/environments.yaml.j2 @@ -0,0 +1,53 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-{{agent_name}}" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + diff --git a/src/agentex/lib/cli/templates/sync-codex/manifest.yaml.j2 b/src/agentex/lib/cli/templates/sync-codex/manifest.yaml.j2 new file mode 100644 index 000000000..8810f6175 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/manifest.yaml.j2 @@ -0,0 +1,120 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Root directory for the build context + root: ../ # Keep this as the default root + + # Paths to include in the Docker build context + # Must include: + # - Your agent's directory (your custom agent code) + # These paths are collected and sent to the Docker daemon for building + include_paths: + - {{ project_path_from_build_root }} + + # Path to your agent's Dockerfile + # This defines how your agent's image is built from the context + # Relative to the root directory + dockerfile: {{ project_path_from_build_root }}/Dockerfile + + # Path to your agent's .dockerignore + # Filters unnecessary files from the build context + # Helps keep build context small and builds fast + dockerignore: {{ project_path_from_build_root }}/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + +# Agent Configuration +# ----------------- +agent: + acp_type: sync + # Unique name for your agent + # Used for task routing and monitoring + name: {{ agent_name }} + + # Description of what your agent does + # Helps with documentation and discovery + description: {{ description }} + + # Temporal workflow configuration + # Set enabled: true to use Temporal workflows for long-running tasks + temporal: + enabled: false + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + # The codex CLI (`codex exec`) reads OPENAI_API_KEY directly; it does not + # use a LiteLLM key. + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on. OPENAI_API_KEY is supplied via the credential + # mapping above (deploy) or your local .env. Do NOT set it to an empty string + # here — that would shadow the real key at runtime. + env: {} + # OPENAI_BASE_URL: "" + + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret names + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/sync-codex/project/acp.py.j2 b/src/agentex/lib/cli/templates/sync-codex/project/acp.py.j2 new file mode 100644 index 000000000..721721d41 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/project/acp.py.j2 @@ -0,0 +1,180 @@ +"""Sync ACP handler for {{ agent_name }} — a Codex CLI harness agent. + +Demonstrates the ``convert_codex_to_agentex_events`` tap + ``CodexTurn`` + +``UnifiedEmitter`` for a sync (HTTP-yield) ACP agent. + +The handler: +1. Spawns ``codex exec --json`` as a LOCAL asyncio subprocess (no sandbox). + This is correct for local development; production isolation is a separate + concern. +2. Wraps the stdout line stream in a ``CodexTurn``. +3. Delivers every canonical ``StreamTaskMessage*`` event via + ``UnifiedEmitter.yield_turn``, which traces + yields each event back to + the HTTP caller in one pass. + +Live runs require: +- ``codex`` CLI on PATH (``npm install -g @openai/codex``) +- ``OPENAI_API_KEY`` set in the environment +""" + +from __future__ import annotations + +import os +import time +import codecs +import asyncio +from typing import AsyncGenerator +from collections.abc import AsyncIterator + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from agentex.lib.adk import CodexTurn +from agentex.lib.types.acp import SendMessageParams +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.types.task_message_update import TaskMessageUpdate +from agentex.types.task_message_content import TaskMessageContent +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create(acp_type="sync") + +MODEL = os.environ.get("CODEX_MODEL", "o4-mini") + + +async def _spawn_codex(model: str) -> asyncio.subprocess.Process: + """Spawn ``codex exec --json`` locally and return the live process. + + Injection seam: tests replace this function with a fake that returns a + mock process whose stdout yields pre-recorded event lines. + + The flags: + --json machine-readable newline-delimited events + --skip-git-repo-check safe to run outside a git repo + --dangerously-bypass-approvals-and-sandbox + skip interactive approval prompts in a + non-interactive (server) context + --model which OpenAI model to use + + The caller writes the prompt to stdin after the process starts, then + closes stdin so codex knows input is complete. + """ + cmd = [ + "codex", + "exec", + "--json", + "--skip-git-repo-check", + "--dangerously-bypass-approvals-and-sandbox", + "--model", + model, + "-", # read prompt from stdin + ] + return await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + # Discard stderr: codex --json writes events to stdout; its stderr is + # progress/debug noise. Capturing it with PIPE but never reading it + # would deadlock once codex fills the OS pipe buffer (~64 KB). + stderr=asyncio.subprocess.DEVNULL, + env={**os.environ}, + ) + + +async def _process_stdout(process: asyncio.subprocess.Process) -> AsyncIterator[str]: + """Yield newline-delimited JSON lines from the process stdout. + + Uses an incremental UTF-8 decoder so a multibyte character split across two + 4 KB reads is decoded correctly instead of being corrupted at the boundary. + """ + assert process.stdout is not None + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + buffer = "" + while True: + chunk = await process.stdout.read(4096) + if not chunk: + break + buffer += decoder.decode(chunk) + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + buffer += decoder.decode(b"", final=True) + if buffer.strip(): + yield buffer.strip() + + +@acp.on_message_send +async def handle_message_send( + params: SendMessageParams, +) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]: + """Handle each message by running ``codex exec`` locally and streaming events.""" + task_id = params.task.id + user_message = params.content.content + logger.info("Processing message for task %s", task_id) + + start_ms = int(time.monotonic() * 1000) + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + process = await _spawn_codex(MODEL) + + # Write prompt to stdin then close it so codex knows input is done. + assert process.stdin is not None + process.stdin.write(user_message.encode("utf-8")) + await process.stdin.drain() + process.stdin.close() + + turn = CodexTurn( + events=_process_stdout(process), + model=MODEL, + ) + + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + # Guarantee the subprocess is reaped even if the generator is abandoned + # (client disconnect / GC) or yield_turn raises; otherwise codex stays + # blocked writing to a full stdout pipe buffer and the process leaks. + try: + async for event in emitter.yield_turn(turn): + yield event + finally: + if process.returncode is None: + process.kill() + await process.wait() + + # Record the real wall-clock duration AFTER streaming completes; setting + # it before the stream ran would capture only subprocess spawn overhead. + turn.duration_ms = int(time.monotonic() * 1000) - start_ms + + if turn_span: + usage = turn.usage() + turn_span.output = { + "model": usage.model, + "input_tokens": usage.input_tokens, + "output_tokens": usage.output_tokens, + } diff --git a/src/agentex/lib/cli/templates/sync-codex/pyproject.toml.j2 b/src/agentex/lib/cli/templates/sync-codex/pyproject.toml.j2 new file mode 100644 index 000000000..e499b1dc1 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/pyproject.toml.j2 @@ -0,0 +1,33 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "{{ project_name }}" +version = "0.1.0" +description = "{{ description }}" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "python-dotenv>=1.0,<2", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/src/agentex/lib/cli/templates/sync-codex/requirements.txt.j2 b/src/agentex/lib/cli/templates/sync-codex/requirements.txt.j2 new file mode 100644 index 000000000..8c0630384 --- /dev/null +++ b/src/agentex/lib/cli/templates/sync-codex/requirements.txt.j2 @@ -0,0 +1,8 @@ +# Install agentex-sdk from local path +agentex-sdk + +# Scale GenAI Platform Python SDK +scale-gp + +# Loads .env files for local development +python-dotenv>=1.0,<2 diff --git a/src/agentex/lib/cli/templates/temporal-codex/.dockerignore.j2 b/src/agentex/lib/cli/templates/temporal-codex/.dockerignore.j2 new file mode 100644 index 000000000..c2d7fca4d --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/.dockerignore.j2 @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/src/agentex/lib/cli/templates/temporal-codex/.env.example.j2 b/src/agentex/lib/cli/templates/temporal-codex/.env.example.j2 new file mode 100644 index 000000000..5d621a83e --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/.env.example.j2 @@ -0,0 +1,13 @@ +# {{ agent_name }} - Environment Variables +# Copy this file to .env and fill in the values + +# API key used by the codex CLI (`codex exec` reads OPENAI_API_KEY directly) +OPENAI_API_KEY= + +# LLM base URL (optional - override to use a different provider) +# OPENAI_BASE_URL= + +# SGP Configuration (optional - for tracing) +# SGP_API_KEY= +# SGP_ACCOUNT_ID= +# SGP_CLIENT_BASE_URL= diff --git a/src/agentex/lib/cli/templates/temporal-codex/Dockerfile-uv.j2 b/src/agentex/lib/cli/templates/temporal-codex/Dockerfile-uv.j2 new file mode 100644 index 000000000..c72c7144c --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/Dockerfile-uv.j2 @@ -0,0 +1,61 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install the codex CLI: the activity shells out to `codex` on every turn, so +# the binary must be present in the runtime image. +RUN npm install -g @openai/codex + +# Install tctl (Temporal CLI) +RUN ARCH="$(uname -m)" && \ + case "$ARCH" in x86_64) TCTL_ARCH=amd64 ;; aarch64|arm64) TCTL_ARCH=arm64 ;; *) TCTL_ARCH=amd64 ;; esac && \ + curl -L "https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_${TCTL_ARCH}.tar.gz" -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +ENV UV_COMPILE_BYTECODE=1 +ENV UV_LINK_MODE=copy +ENV UV_HTTP_TIMEOUT=1000 + +WORKDIR /app/{{ project_path_from_build_root }} + +# Copy dependency files for layer caching +COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./ + +# Install dependencies (without project itself, for layer caching) +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-install-project --no-dev + +# Copy the project code +COPY {{ project_path_from_build_root }}/project ./project + +# Install the project +RUN --mount=type=cache,target=/root/.cache/uv \ + uv sync --locked --no-dev + +ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH" + +# Run the ACP server using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-codex/Dockerfile.j2 b/src/agentex/lib/cli/templates/temporal-codex/Dockerfile.j2 new file mode 100644 index 000000000..0ae4e2079 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/Dockerfile.j2 @@ -0,0 +1,54 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Install the codex CLI: the activity shells out to `codex` on every turn, so +# the binary must be present in the runtime image. +RUN npm install -g @openai/codex + +# Install tctl (Temporal CLI) +RUN ARCH="$(uname -m)" && \ + case "$ARCH" in x86_64) TCTL_ARCH=amd64 ;; aarch64|arm64) TCTL_ARCH=arm64 ;; *) TCTL_ARCH=amd64 ;; esac && \ + curl -L "https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_${TCTL_ARCH}.tar.gz" -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy just the requirements file to optimize caching +COPY {{ project_path_from_build_root }}/requirements.txt /app/{{ project_path_from_build_root }}/requirements.txt + +WORKDIR /app/{{ project_path_from_build_root }} + +# Install the required Python packages +RUN uv pip install --system -r requirements.txt + +# Copy the project code +COPY {{ project_path_from_build_root }}/project /app/{{ project_path_from_build_root }}/project + +# Run the ACP server using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-codex/README.md.j2 b/src/agentex/lib/cli/templates/temporal-codex/README.md.j2 new file mode 100644 index 000000000..794109ff3 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/README.md.j2 @@ -0,0 +1,80 @@ +# {{ agent_name }} — AgentEx Temporal + Codex + +This template builds a **Temporal-durable** agent that drives the **Codex CLI** +through the unified harness surface on AgentEx: +- A Temporal workflow holds conversation state (the codex thread ID) durably + across worker crashes — no external state store needed +- Each turn delegates to the `run_codex_turn` activity, which spawns the CLI + (subprocess I/O is not permitted on the workflow event loop) +- The activity wraps the CLI's stdout stream in a `CodexTurn` and delivers + canonical `StreamTaskMessage*` events via `UnifiedEmitter.auto_send_turn` +- Tracing integration to SGP / AgentEx + +## Prerequisites + +- The `codex` CLI installed and on your `PATH` (`npm install -g @openai/codex`) +- An `OPENAI_API_KEY` in your environment +- A running Temporal service (provided automatically by the local dev stack) + +## Running the Agent + +```bash +agentex agents run --manifest manifest.yaml +``` + +This starts both the ACP HTTP server and the Temporal worker. + +## Project Structure + +``` +{{ project_name }}/ +├── project/ +│ ├── __init__.py +│ ├── acp.py # Thin ACP server; FastACP auto-wires to the workflow +│ ├── workflow.py # Temporal workflow (durable conversation state) +│ ├── activities.py # run_codex_turn activity (CLI subprocess) +│ └── run_worker.py # Temporal worker entrypoint +├── Dockerfile +├── manifest.yaml +├── dev.ipynb +{% if use_uv %} +└── pyproject.toml +{% else %} +└── requirements.txt +{% endif %} +``` + +## Key Concepts + +### Subprocess must run in an activity +Temporal runs workflow + signal-handler bodies on a deterministic sandbox event +loop that does not implement `subprocess_exec`. The workflow therefore delegates +each turn to the `run_codex_turn` activity, which also gains Temporal's retry + +timeout guarantees. + +### Durable multi-turn memory +The codex thread ID is kept on the workflow instance; Temporal's durable replay +reconstructs it after a crash, so the next turn resumes the same codex session. + +### The unified harness surface +`CodexTurn` + `UnifiedEmitter` are the unified harness surface. The turn +normalizes CLI output into canonical AgentEx events; the emitter traces and +delivers them. + +## Development + +### 1. Choose a model +Set `CODEX_MODEL` (defaults to `o4-mini`) to control which model codex uses. + +### 2. Customize the subprocess +Edit `_spawn_codex` in `project/activities.py` to change the CLI flags or how +the prompt is delivered. + +### 3. Configure Credentials +Set your credentials via `manifest.yaml`, an exported environment variable, or a +`.env` file in the project directory. + +### 4. Run Locally +```bash +export ENVIRONMENT=development && agentex agents run --manifest manifest.yaml +``` diff --git a/src/agentex/lib/cli/templates/temporal-codex/dev.ipynb.j2 b/src/agentex/lib/cli/templates/temporal-codex/dev.ipynb.j2 new file mode 100644 index 000000000..d3a68303f --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/dev.ipynb.j2 @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "36834357", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c309d6", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"{{ agent_name }}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9f6e6ef0", + "metadata": {}, + "outputs": [], + "source": [ + "# (REQUIRED) Create a new task. For Async agents, you must create a task for messages to be associated with.\n", + "import uuid\n", + "\n", + "rpc_response = client.agents.create_task(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"name\": f\"{str(uuid.uuid4())[:8]}-task\",\n", + " \"params\": {}\n", + " }\n", + ")\n", + "\n", + "task = rpc_response.result\n", + "print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b03b0d37", + "metadata": {}, + "outputs": [], + "source": [ + "# Send an event to the agent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_event(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"task_id\": task.id,\n", + " }\n", + ")\n", + "\n", + "event = rpc_response.result\n", + "print(event)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6927cc0", + "metadata": {}, + "outputs": [], + "source": [ + "# Subscribe to the async task messages produced by the agent\n", + "from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages\n", + "\n", + "task_messages = subscribe_to_async_task_messages(\n", + " client=client,\n", + " task=task, \n", + " only_after_timestamp=event.created_at, \n", + " print_messages=True,\n", + " rich_print=True,\n", + " timeout=5,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4864e354", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/agentex/lib/cli/templates/temporal-codex/environments.yaml.j2 b/src/agentex/lib/cli/templates/temporal-codex/environments.yaml.j2 new file mode 100644 index 000000000..a3df5e228 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/environments.yaml.j2 @@ -0,0 +1,64 @@ +# Agent Environment Configuration +# ------------------------------ +# This file defines environment-specific settings for your agent. +# This DIFFERS from the manifest.yaml file in that it is used to program things that are ONLY per environment. + +# ********** EXAMPLE ********** +# schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +# environments: +# dev: +# auth: +# principal: +# user_id: "1234567890" +# user_name: "John Doe" +# user_email: "john.doe@example.com" +# user_role: "admin" +# user_permissions: "read, write, delete" +# helm_overrides: # This is used to override the global helm values.yaml file in the agentex-agent helm charts +# replicas: 3 +# resources: +# requests: +# cpu: "1000m" +# memory: "2Gi" +# limits: +# cpu: "2000m" +# memory: "4Gi" +# env: +# - name: LOG_LEVEL +# value: "DEBUG" +# - name: ENVIRONMENT +# value: "staging" +# +# kubernetes: +# # OPTIONAL - Otherwise it will be derived from separately. However, this can be used to override the derived +# # namespace and deploy it with in the same namespace that already exists for a separate agent. +# namespace: "team-{{agent_name}}" +# ********** END EXAMPLE ********** + +schema_version: "v1" # This is used to validate the file structure and is not used by the agentex CLI +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + # This is used to override the global helm values.yaml file in the agentex-agent helm charts + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 2 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-codex/manifest.yaml.j2 b/src/agentex/lib/cli/templates/temporal-codex/manifest.yaml.j2 new file mode 100644 index 000000000..d2a3df5c1 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/manifest.yaml.j2 @@ -0,0 +1,142 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +# Build Configuration +# ------------------ +# The build config defines what gets packaged into your agent's Docker image. +# This same configuration is used whether building locally or remotely. +# +# When building: +# 1. All files from include_paths are collected into a build context +# 2. The context is filtered by dockerignore rules +# 3. The Dockerfile uses this context to build your agent's image +# 4. The image is pushed to a registry and used to run your agent +build: + context: + # Root directory for the build context + root: ../ # Keep this as the default root + + # Paths to include in the Docker build context + # Must include: + # - Your agent's directory (your custom agent code) + # These paths are collected and sent to the Docker daemon for building + include_paths: + - {{ project_path_from_build_root }} + + # Path to your agent's Dockerfile + # This defines how your agent's image is built from the context + # Relative to the root directory + dockerfile: {{ project_path_from_build_root }}/Dockerfile + + # Path to your agent's .dockerignore + # Filters unnecessary files from the build context + # Helps keep build context small and builds fast + dockerignore: {{ project_path_from_build_root }}/.dockerignore + + +# Local Development Configuration +# ----------------------------- +# Only used when running the agent locally +local_development: + agent: + port: 8000 # Port where your local ACP server is running + host_address: host.docker.internal # Host address for Docker networking (host.docker.internal for Docker, localhost for direct) + + # File paths for local development (relative to this manifest.yaml) + paths: + # Path to ACP server file + # Examples: + # project/acp.py (standard) + # src/server.py (custom structure) + # ../shared/acp.py (shared across projects) + # /absolute/path/acp.py (absolute path) + acp: project/acp.py + + # Path to temporal worker file + # Examples: + # project/run_worker.py (standard) + # workers/temporal.py (custom structure) + # ../shared/worker.py (shared across projects) + worker: project/run_worker.py + + +# Agent Configuration +# ----------------- +agent: + # Type of agent - either sync or async + acp_type: async + + # Unique name for your agent + # Used for task routing and monitoring + name: {{ agent_name }} + + # Description of what your agent does + # Helps with documentation and discovery + description: "{{ description }}" + + # Temporal workflow configuration + # This enables your agent to run as a Temporal workflow for long-running tasks + temporal: + enabled: true + workflows: + # Name of the workflow class + # Must match the @workflow.defn name in your workflow.py + - name: {{ workflow_name }} + + # Queue name for task distribution + # Used by Temporal to route tasks to your agent + # Convention: _task_queue + queue_name: {{ queue_name }} + + # Optional: Health check port for temporal worker + # Defaults to 80 if not specified + # health_check_port: 80 + + # Optional: Credentials mapping + # Maps Kubernetes secrets to environment variables + # Common credentials include: + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + # The codex CLI spawned in project/activities.py reads OPENAI_API_KEY + # directly; without it every turn fails with an auth error. + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + + # Optional: Set Environment variables for running your agent locally as well + # as for deployment later on + env: {} + # LITELLM_API_KEY: "" + # OPENAI_BASE_URL: "" + # OPENAI_ORG_ID: "" + + +# Deployment Configuration +# ----------------------- +# Configuration for deploying your agent to Kubernetes clusters +deployment: + # Container image configuration + image: + repository: "" # Update with your container registry + tag: "latest" # Default tag, should be versioned in production + + imagePullSecrets: [] # Update with your image pull secret name + # - name: my-registry-secret + + # Global deployment settings that apply to all clusters + # These can be overridden in cluster-specific environments (environments.yaml) + global: + # Default replica count + replicaCount: 1 + + # Default resource requirements + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" \ No newline at end of file diff --git a/src/agentex/lib/cli/templates/temporal-codex/project/acp.py.j2 b/src/agentex/lib/cli/templates/temporal-codex/project/acp.py.j2 new file mode 100644 index 000000000..7ef5744f0 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/project/acp.py.j2 @@ -0,0 +1,32 @@ +"""ACP server for {{ agent_name }} — a Temporal Codex harness agent. + +This file is intentionally thin. When ``acp_type="async"`` is combined with +``TemporalACPConfig(type="temporal", ...)``, FastACP auto-wires: + + HTTP task/create -> @workflow.run on the workflow class + HTTP task/event/send -> @workflow.signal(SignalName.RECEIVE_EVENT) + HTTP task/cancel -> workflow cancellation via the Temporal client + +so we don't define any handlers here. The actual agent code lives in +``project/workflow.py`` and is executed by the Temporal worker +(``project/run_worker.py``), not by this HTTP process. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + ), +) diff --git a/src/agentex/lib/cli/templates/temporal-codex/project/activities.py.j2 b/src/agentex/lib/cli/templates/temporal-codex/project/activities.py.j2 new file mode 100644 index 000000000..0111794d9 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/project/activities.py.j2 @@ -0,0 +1,151 @@ +"""Temporal activity for {{ agent_name }} — Codex harness. + +Subprocess spawning (and any other I/O) must run inside a Temporal *activity*, +not in workflow code. Temporal runs workflow + signal-handler bodies on a +deterministic sandbox event loop that does not implement ``subprocess_exec`` +(or threads / sockets), so spawning ``codex exec`` directly in the signal +handler raises ``NotImplementedError``. This activity runs codex, drives the +``CodexTurn`` through ``UnifiedEmitter.auto_send_turn`` (the async Redis push +path), and returns the turn result to the workflow. + +The ``_spawn_codex`` / ``_process_stdout`` seams are injectable: offline tests +can replace them with fakes that yield pre-recorded event lines so no real CLI +runs. +""" + +from __future__ import annotations + +import os +import codecs +import asyncio +from typing import Any +from datetime import datetime +from collections.abc import AsyncIterator + +from temporalio import activity + +from agentex.lib.adk import CodexTurn +from agentex.lib.core.harness import UnifiedEmitter +from agentex.lib.utils.logging import make_logger +from agentex.lib.utils.model_utils import BaseModel + +logger = make_logger(__name__) + +RUN_CODEX_TURN_ACTIVITY = "run_codex_turn" + + +class RunCodexTurnParams(BaseModel): + """Arguments for one codex turn run inside an activity.""" + + task_id: str + prompt: str + model: str + trace_id: str | None = None + parent_span_id: str | None = None + thread_id: str | None = None + created_at: datetime | None = None + + +class RunCodexTurnResult(BaseModel): + """Result returned from the activity to the workflow.""" + + final_text: str + session_id: str | None = None + model: str | None = None + + +async def _spawn_codex( + model: str, + thread_id: str | None = None, +) -> asyncio.subprocess.Process: + """Spawn ``codex exec --json`` locally and return the live process. + + Injection seam: tests replace this function with a fake that returns a + mock process whose stdout yields pre-recorded event lines. + + The caller writes the prompt to stdin after the process starts, then + closes stdin so codex knows input is complete. + """ + base_flags = [ + "--json", + "--skip-git-repo-check", + "--dangerously-bypass-approvals-and-sandbox", + "--model", + model, + ] + + if thread_id: + cmd = ["codex", "exec", *base_flags, "resume", thread_id, "-"] + else: + cmd = ["codex", "exec", *base_flags, "-"] + + return await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + # Discard stderr: codex --json writes events to stdout; its stderr is + # progress/debug noise. Capturing it with PIPE but never reading it + # would deadlock once codex fills the OS pipe buffer (~64 KB). + stderr=asyncio.subprocess.DEVNULL, + env={**os.environ}, + ) + + +async def _process_stdout(process: asyncio.subprocess.Process) -> AsyncIterator[str]: + """Yield newline-delimited JSON lines from the process stdout. + + Uses an incremental UTF-8 decoder so a multibyte character split across two + 4 KB reads is decoded correctly instead of being corrupted at the boundary. + """ + assert process.stdout is not None + decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") + buffer = "" + while True: + chunk = await process.stdout.read(4096) + if not chunk: + break + buffer += decoder.decode(chunk) + while "\n" in buffer: + line, buffer = buffer.split("\n", 1) + line = line.strip() + if line: + yield line + buffer += decoder.decode(b"", final=True) + if buffer.strip(): + yield buffer.strip() + + +@activity.defn(name=RUN_CODEX_TURN_ACTIVITY) +async def run_codex_turn(params: RunCodexTurnParams) -> dict[str, Any]: + """Run one codex turn end-to-end and stream events to the task. + + Runs in an activity (real asyncio loop) so subprocess I/O is permitted. + """ + process = await _spawn_codex(params.model, thread_id=params.thread_id) + + assert process.stdin is not None + process.stdin.write(params.prompt.encode("utf-8")) + await process.stdin.drain() + process.stdin.close() + + turn = CodexTurn(events=_process_stdout(process), model=params.model) + emitter = UnifiedEmitter( + task_id=params.task_id, + trace_id=params.trace_id, + parent_span_id=params.parent_span_id, + ) + # Guarantee the subprocess is reaped even if auto_send_turn raises; + # otherwise codex stays blocked writing to a full stdout pipe buffer and the + # OS process leaks until the worker restarts. + try: + result = await emitter.auto_send_turn(turn, created_at=params.created_at) + finally: + if process.returncode is None: + process.kill() + await process.wait() + + return RunCodexTurnResult( + final_text=result.final_text, + session_id=turn.session_id, + model=turn.usage().model, + ).model_dump() diff --git a/src/agentex/lib/cli/templates/temporal-codex/project/run_worker.py.j2 b/src/agentex/lib/cli/templates/temporal-codex/project/run_worker.py.j2 new file mode 100644 index 000000000..d86519977 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/project/run_worker.py.j2 @@ -0,0 +1,41 @@ +"""Temporal worker for {{ agent_name }} — Codex harness. + +Run as a separate long-lived process alongside the ACP HTTP server. The +worker polls Temporal for workflow + activity tasks and executes them. + +The codex CLI subprocess runs in the ``run_codex_turn`` activity (registered +below alongside the built-in Agentex activities), because subprocess I/O is not +permitted on the Temporal workflow event loop. +""" + +import asyncio + +from project.workflow import {{ workflow_class }} +from project.activities import run_codex_turn +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + worker = AgentexWorker(task_queue=task_queue_name) + + await worker.run( + activities=[run_codex_turn, *get_all_activities()], + workflow={{ workflow_class }}, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/agentex/lib/cli/templates/temporal-codex/project/workflow.py.j2 b/src/agentex/lib/cli/templates/temporal-codex/project/workflow.py.j2 new file mode 100644 index 000000000..39325ed60 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/project/workflow.py.j2 @@ -0,0 +1,152 @@ +"""Temporal workflow for {{ agent_name }} — Codex harness. + +Demonstrates the ``convert_codex_to_agentex_events`` tap + ``CodexTurn`` + +``UnifiedEmitter`` for a Temporal-durable ACP agent. + +KEY CONCEPTS DEMONSTRATED: +- Running ``codex exec --json`` in the ``run_codex_turn`` activity. Subprocess + I/O is not permitted on the Temporal workflow event loop (the deterministic + sandbox loop does not implement ``subprocess_exec``), so the signal handler + delegates the turn to an activity, which also gets Temporal's retry + timeout + guarantees. +- Wrapping the stdout line stream in a ``CodexTurn`` (inside the activity). +- Delivering events via ``UnifiedEmitter.auto_send_turn``, which pushes + ``StreamTaskMessage*`` events to Redis so the UI sees tokens in real time. +- Passing ``created_at=workflow.now()`` for deterministic timestamps under + Temporal replay (required for Temporal-safe delivery). +- Persisting the codex thread ID on the workflow instance itself — Temporal's + workflow state is durable, so no external ``adk.state`` round-trip is needed. +""" + +from __future__ import annotations + +import os +import asyncio +from datetime import timedelta + +from temporalio import workflow + +from agentex.lib import adk +from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +with workflow.unsafe.imports_passed_through(): + from project.activities import RunCodexTurnParams, run_codex_turn + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + +MODEL = os.environ.get("CODEX_MODEL", "o4-mini") + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class {{ workflow_class }}(BaseWorkflow): + """Long-running Temporal workflow that runs codex exec for each turn. + + Conversation state (codex thread ID + turn counter) is kept on the + workflow instance. Temporal's durable replay reconstructs this state if + the worker crashes, so no external ``adk.state`` round-trip is needed. + """ + + def __init__(self): + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._turn_number = 0 + self._codex_thread_id: str | None = None + # Serialize turns: signal handlers can interleave at await points, so two + # quick messages could both read the same stale _codex_thread_id and fork + # the codex session. The lock keeps turns sequential and preserves + # conversation continuity. + self._turn_lock = asyncio.Lock() + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Handle a new user message: spawn codex, stream events via UnifiedEmitter.""" + logger.info("Received task event: %s", params.task.id) + async with self._turn_lock: + self._turn_number += 1 + + await adk.messages.create(task_id=params.task.id, content=params.event.content) + + user_message = params.event.content.content + + async with adk.tracing.span( + trace_id=params.task.id, + task_id=params.task.id, + name=f"Turn {self._turn_number}", + input={"message": user_message}, + ) as span: + # Delegate the subprocess turn to an activity: subprocess I/O is not + # permitted on the Temporal workflow event loop. The activity streams + # events to the task and returns the final text + codex thread id. + # workflow.now() gives a deterministic timestamp under replay. + result = await workflow.execute_activity( + run_codex_turn, + RunCodexTurnParams( + task_id=params.task.id, + prompt=user_message, + model=MODEL, + trace_id=params.task.id, + parent_span_id=span.id if span else None, + thread_id=self._codex_thread_id, + created_at=workflow.now(), + ), + start_to_close_timeout=timedelta(minutes=5), + ) + + # Persist the codex thread id so the next turn resumes the session. + session_id = result.get("session_id") + if session_id: + self._codex_thread_id = session_id + + if span: + span.output = { + "final_text": result.get("final_text"), + "model": result.get("model"), + } + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + """Workflow entry point — keep the conversation alive for incoming signals.""" + logger.info("Task created: %s", params.task.id) + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + "Task initialized.\n" + "Send me a message and I'll run codex (local subprocess) " + "to answer, streaming events via the unified harness surface." + ), + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" + + @workflow.signal + async def complete_task_signal(self) -> None: + """Graceful workflow shutdown signal.""" + logger.info("Received complete_task signal") + self._complete_task = True diff --git a/src/agentex/lib/cli/templates/temporal-codex/pyproject.toml.j2 b/src/agentex/lib/cli/templates/temporal-codex/pyproject.toml.j2 new file mode 100644 index 000000000..2c6ec9c2f --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/pyproject.toml.j2 @@ -0,0 +1,37 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "{{ project_name }}" +version = "0.1.0" +description = "{{ description }}" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "temporalio>=1.18.2", + "python-dotenv>=1.0,<2", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/src/agentex/lib/cli/templates/temporal-codex/requirements.txt.j2 b/src/agentex/lib/cli/templates/temporal-codex/requirements.txt.j2 new file mode 100644 index 000000000..a060d2331 --- /dev/null +++ b/src/agentex/lib/cli/templates/temporal-codex/requirements.txt.j2 @@ -0,0 +1,11 @@ +# Agentex SDK +agentex-sdk + +# Scale GenAI Platform Python SDK +scale-gp + +# Temporal workflow engine +temporalio>=1.18.2 + +# Loads .env files for local development +python-dotenv>=1.0,<2