Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ ENV UV_HTTP_TIMEOUT=1000
WORKDIR /app/{{ project_path_from_build_root }}

# Copy dependency files for layer caching
COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./
COPY {{ project_path_from_build_root }}/pyproject.toml ./

# Install dependencies (without project itself, for layer caching)
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-install-project --no-dev
uv sync --no-install-project --no-dev

# Copy the project code
COPY {{ project_path_from_build_root }}/project ./project

# Install the project
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-dev
uv sync --no-dev

ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH"
ENV PYTHONPATH=/app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ agent:

# Description of what your agent does
# Helps with documentation and discovery
description: {{ description }}
description: {{ description | tojson }}

# Temporal workflow configuration
# Set enabled: true to use Temporal workflows for long-running tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ from agentex.lib.core.harness import UnifiedEmitter
from agentex.lib.types.fastacp import AsyncACPConfig
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.types.text_content import TextContent
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config

Expand Down Expand Up @@ -134,7 +135,11 @@ async def handle_task_create(params: CreateTaskParams):
async def handle_task_event_send(params: SendEventParams):
"""Handle a user message: spawn Claude Code locally and push events to the task stream."""
task_id = params.task.id
prompt = params.event.content.content
content = params.event.content
if not isinstance(content, TextContent):
logger.warning("Ignoring non-text event content (type=%s)", getattr(content, "type", "?"))
return
prompt = content.content
logger.info("Processing message for task %s", task_id)

await adk.messages.create(task_id=task_id, content=params.event.content)
Expand Down
6 changes: 3 additions & 3 deletions src/agentex/lib/cli/templates/default-codex/Dockerfile-uv.j2
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ ENV UV_HTTP_TIMEOUT=1000
WORKDIR /app/{{ project_path_from_build_root }}

# Copy dependency files for layer caching
COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./
COPY {{ project_path_from_build_root }}/pyproject.toml ./

# Install dependencies (without project itself, for layer caching)
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-install-project --no-dev
uv sync --no-install-project --no-dev

# Copy the project code
COPY {{ project_path_from_build_root }}/project ./project

# Install the project
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-dev
uv sync --no-dev

ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH"
ENV PYTHONPATH=/app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ agent:

# Description of what your agent does
# Helps with documentation and discovery
description: {{ description }}
description: {{ description | tojson }}

# Temporal workflow configuration
# Set enabled: true to use Temporal workflows for long-running tasks
Expand Down
179 changes: 109 additions & 70 deletions src/agentex/lib/cli/templates/default-codex/project/acp.py.j2
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ load_dotenv()
import agentex.lib.adk as adk
from agentex.lib.adk import CodexTurn
from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams
from agentex.types.text_content import TextContent
from agentex.lib.core.harness import UnifiedEmitter
from agentex.lib.types.fastacp import AsyncACPConfig
from agentex.lib.types.tracing import SGPTracingProcessorConfig
Expand All @@ -57,6 +58,20 @@ acp = FastACP.create(

MODEL = os.environ.get("CODEX_MODEL", "o4-mini")

# Serialize turns per task. Two ``task/event/send`` calls for the same task can
# otherwise both read the old ``codex_thread_id`` (or ``None``), run independent
# codex turns, and race to overwrite the stored thread id — forking the session.
# A per-task lock keeps turns sequential without blocking other tasks.
_task_locks: dict[str, asyncio.Lock] = {}


def _task_lock(task_id: str) -> asyncio.Lock:
lock = _task_locks.get(task_id)
if lock is None:
lock = asyncio.Lock()
_task_locks[task_id] = lock
return lock
Comment thread
declan-scale marked this conversation as resolved.


class ConversationState(BaseModel):
"""Per-task conversation state persisted via ``adk.state``.
Expand Down Expand Up @@ -150,81 +165,105 @@ async def handle_task_event_send(params: SendEventParams):
"""Handle each user message: spawn codex, stream events, save thread ID."""
task_id = params.task.id
agent_id = params.agent.id
user_message = params.event.content.content

logger.info("Processing message for task %s", task_id)

await adk.messages.create(task_id=task_id, content=params.event.content)

task_state = await adk.state.get_by_task_and_agent(task_id=task_id, agent_id=agent_id)
if task_state is None:
state = ConversationState()
task_state = await adk.state.create(task_id=task_id, agent_id=agent_id, state=state)
else:
state = ConversationState.model_validate(task_state.state)

state.turn_number += 1

async with adk.tracing.span(
trace_id=task_id,
task_id=task_id,
name=f"Turn {state.turn_number}",
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
start_ms = int(time.monotonic() * 1000)

process = await _spawn_codex(MODEL, thread_id=state.codex_thread_id)

assert process.stdin is not None
process.stdin.write(user_message.encode("utf-8"))
await process.stdin.drain()
process.stdin.close()

turn = CodexTurn(
events=_process_stdout(process),
model=MODEL,
content = params.event.content
if not isinstance(content, TextContent):
logger.warning(
"Ignoring non-text event content (type=%s) for task %s",
getattr(content, "type", "?"),
task_id,
)
return
user_message = content.content

emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)
logger.info("Processing message for task %s", task_id)

# Guarantee the subprocess is reaped even if auto_send_turn raises
# (e.g. a Redis error); otherwise codex stays blocked writing to a full
# stdout pipe buffer and the OS process leaks until the server restarts.
try:
result = await emitter.auto_send_turn(turn)
finally:
if process.returncode is None:
process.kill()
await process.wait()

# Record the real wall-clock duration AFTER streaming completes; setting
# it before the stream ran would capture only subprocess spawn overhead.
turn.duration_ms = int(time.monotonic() * 1000) - start_ms

usage = turn.usage()

# Persist the codex session id (public accessor; valid post-stream) so the
# next turn resumes the same session.
if turn.session_id:
state.codex_thread_id = turn.session_id

await adk.state.update(
state_id=task_state.id,
# Serialize the whole turn (echo + the read-modify-write of
# ``codex_thread_id``) so two concurrent turns on the same task cannot fork
# the codex session or interleave their echoed messages.
lock = _task_lock(task_id)
await lock.acquire()
try:
# Echo inside the lock so this turn's message stays ordered with it.
await adk.messages.create(task_id=task_id, content=content)

task_state = await adk.state.get_by_task_and_agent(task_id=task_id, agent_id=agent_id)
if task_state is None:
state = ConversationState()
task_state = await adk.state.create(task_id=task_id, agent_id=agent_id, state=state)
else:
state = ConversationState.model_validate(task_state.state)

state.turn_number += 1

async with adk.tracing.span(
trace_id=task_id,
task_id=task_id,
agent_id=agent_id,
state=state,
)

if turn_span:
turn_span.output = {
"final_text": result.final_text,
"model": usage.model,
}
name=f"Turn {state.turn_number}",
input={"message": user_message},
data={"__span_type__": "AGENT_WORKFLOW"},
) as turn_span:
start_ms = int(time.monotonic() * 1000)

process = await _spawn_codex(MODEL, thread_id=state.codex_thread_id)

assert process.stdin is not None
process.stdin.write(user_message.encode("utf-8"))
await process.stdin.drain()
process.stdin.close()

turn = CodexTurn(
events=_process_stdout(process),
model=MODEL,
)

emitter = UnifiedEmitter(
task_id=task_id,
trace_id=task_id,
parent_span_id=turn_span.id if turn_span else None,
)

# Guarantee the subprocess is reaped even if auto_send_turn raises
# (e.g. a Redis error); otherwise codex stays blocked writing to a full
# stdout pipe buffer and the OS process leaks until the server restarts.
try:
result = await emitter.auto_send_turn(turn)
finally:
if process.returncode is None:
process.kill()
await process.wait()

# Record the real wall-clock duration AFTER streaming completes; setting
# it before the stream ran would capture only subprocess spawn overhead.
turn.duration_ms = int(time.monotonic() * 1000) - start_ms

usage = turn.usage()

# Persist the codex session id (public accessor; valid post-stream) so the
# next turn resumes the same session.
if turn.session_id:
state.codex_thread_id = turn.session_id

await adk.state.update(
state_id=task_state.id,
task_id=task_id,
agent_id=agent_id,
state=state,
)

if turn_span:
turn_span.output = {
"final_text": result.final_text,
"model": usage.model,
}
finally:
lock.release()
# Evict the lock once released and idle (unlocked, no waiters) so
# ``_task_locks`` stays bounded even if the turn raised. There is no
# await between ``_task_lock()`` and acquiring it, so an unlocked,
# waiter-free lock has no in-flight user.
if not lock.locked() and not getattr(lock, "_waiters", None):
_task_locks.pop(task_id, None)


@acp.on_task_cancel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ ENV UV_HTTP_TIMEOUT=1000
WORKDIR /app/{{ project_path_from_build_root }}

# Copy dependency files for layer caching
COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./
COPY {{ project_path_from_build_root }}/pyproject.toml ./

# Install dependencies (without project itself, for layer caching)
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-install-project --no-dev
uv sync --no-install-project --no-dev

# Copy the project code
COPY {{ project_path_from_build_root }}/project ./project

# Install the project
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-dev
uv sync --no-dev

ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH"
ENV PYTHONPATH=/app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ agent:

# Description of what your agent does
# Helps with documentation and discovery
description: {{ description }}
description: {{ description | tojson }}

# Temporal workflow configuration
# Set enabled: true to use Temporal workflows for long-running tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ from agentex.protocol.acp import SendEventParams, CancelTaskParams, CreateTaskPa
from agentex.lib.types.fastacp import AsyncACPConfig
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.types.text_content import TextContent
from agentex.lib.adk import LangGraphTurn

from project.graph import create_graph
Expand Down Expand Up @@ -55,7 +56,11 @@ async def handle_task_event_send(params: SendEventParams):
"""Handle incoming events, streaming tokens and tool calls via Redis."""
graph = await get_graph()
task_id = params.task.id
user_message = params.event.content.content
content = params.event.content
if not isinstance(content, TextContent):
logger.warning("Ignoring non-text event content (type=%s)", getattr(content, "type", "?"))
return
user_message = content.content

logger.info(f"Processing message for thread {task_id}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ ENV UV_HTTP_TIMEOUT=1000
WORKDIR /app/{{ project_path_from_build_root }}

# Copy dependency files for layer caching
COPY {{ project_path_from_build_root }}/pyproject.toml {{ project_path_from_build_root }}/uv.lock ./
COPY {{ project_path_from_build_root }}/pyproject.toml ./

# Install dependencies (without project itself, for layer caching)
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-install-project --no-dev
uv sync --no-install-project --no-dev

# Copy the project code
COPY {{ project_path_from_build_root }}/project ./project

# Install the project
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-dev
uv sync --no-dev

ENV PATH="/app/{{ project_path_from_build_root }}/.venv/bin:$PATH"
ENV PYTHONPATH=/app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ agent:

# Description of what your agent does
# Helps with documentation and discovery
description: {{ description }}
description: {{ description | tojson }}

# Temporal workflow configuration
# Set enabled: true to use Temporal workflows for long-running tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskP
from agentex.lib.types.fastacp import AsyncACPConfig
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.types.text_content import TextContent
from agentex.lib.utils.model_utils import BaseModel
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.core.harness.emitter import UnifiedEmitter
Expand Down Expand Up @@ -112,7 +113,11 @@ async def handle_task_event_send(params: SendEventParams):
agent = get_agent()
task_id = params.task.id
agent_id = params.agent.id
user_message = params.event.content.content
content = params.event.content
if not isinstance(content, TextContent):
logger.warning("Ignoring non-text event content (type=%s)", getattr(content, "type", "?"))
return
user_message = content.content

logger.info(f"Processing message for task {task_id}")

Expand Down
Loading
Loading