Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions agentex/src/domain/use_cases/agents_acp_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import random
from collections.abc import AsyncIterator, Callable
from datetime import UTC, datetime, timedelta
from typing import Annotated, Any

from fastapi import Depends
Expand Down Expand Up @@ -832,6 +833,29 @@ async def _handle_event_send(
task = await self.task_service.get_task(
id=params.task_id, name=params.task_name
)

# Persist user-authored event payloads as a TaskMessage on the
# platform side BEFORE forwarding the event to the agent. Without
# this, the agent runtime is the one that materializes the user's
# TaskMessage when it processes the forwarded event, which races
# against any messages the agent emits from its task-create handler
# (e.g. a startup "creating sandbox..." data row). BSON Date is
# millisecond-precision, so two writes inside the same millisecond
# have no defined order — and even when the writes happen in order,
# the agent's startup row can land first if event_send takes any
# round-trip time to reach the agent. Writing here closes both
# windows: the user row exists in the DB before the agent gets the
# event, and its created_at is clamped strictly after every
# already-existing message so the row sorts deterministically when
# paginated by (created_at, _id).
if (
params.content is not None
and params.content.author == MessageAuthor.USER
):
await self._append_user_event_message(
task_id=task.id, content=params.content
)

# Create the event in the DB
event_entity = await self.task_service.create_event_and_forward_to_acp(
agent=agent,
Expand All @@ -842,5 +866,41 @@ async def _handle_event_send(
)
return event_entity

async def _append_user_event_message(
self,
task_id: str,
content: TaskMessageContentEntity,
) -> TaskMessageEntity:
"""
Persist a user-authored event payload as a TaskMessage with a
created_at that is strictly greater than every existing message's
created_at for this task.
The clamp is `max(datetime.now(UTC), latest_existing + 1ms)`. The
1ms bump matches the staggering used by ``append_messages`` so the
stored ordering is durable against BSON Date's millisecond
precision: two writes that would otherwise collide on the same
millisecond now sort deterministically.
"""
latest = await self.task_message_service.get_messages(
task_id=task_id,
limit=1,
page_number=1,
order_by="created_at",
order_direction="desc",
)
now = datetime.now(UTC)
if latest and latest[0].created_at is not None:
candidate = latest[0].created_at + timedelta(milliseconds=1)
clamped_created_at = max(now, candidate)
else:
clamped_created_at = now

return await self.task_message_service.append_message(
Comment on lines +885 to +899

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 TOCTOU window in timestamp clamp

There is a read-then-write gap between get_messages (which fetches the latest created_at) and append_message (which writes with the clamped timestamp). If the agent emits another startup message between those two awaits, it will land with a wall-clock timestamp that is ≥ latest[0].created_at + 1ms, and the platform-written user message may end up with the same or an earlier millisecond than that new agent message — reproducing the original ordering ambiguity within the narrow window. The PR description already defers a complete fix (per-task monotonic sequence numbers) as a follow-up, so this is a known limitation, but it's worth making explicit so future work can close the gap.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/use_cases/agents_acp_use_case.py
Line: 885-899

Comment:
**TOCTOU window in timestamp clamp**

There is a read-then-write gap between `get_messages` (which fetches the latest `created_at`) and `append_message` (which writes with the clamped timestamp). If the agent emits another startup message between those two awaits, it will land with a wall-clock timestamp that is ≥ `latest[0].created_at + 1ms`, and the platform-written user message may end up with the same or an earlier millisecond than that new agent message — reproducing the original ordering ambiguity within the narrow window. The PR description already defers a complete fix (per-task monotonic sequence numbers) as a follow-up, so this is a known limitation, but it's worth making explicit so future work can close the gap.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

task_id=task_id,
content=content,
created_at=clamped_created_at,
)


DAgentsACPUseCase = Annotated[AgentsACPUseCase, Depends(AgentsACPUseCase)]
209 changes: 209 additions & 0 deletions agentex/tests/unit/use_cases/test_agents_acp_use_case.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
from unittest.mock import AsyncMock
from uuid import uuid4
from zoneinfo import ZoneInfo
Expand All @@ -19,6 +20,7 @@
StreamTaskMessageStartEntity,
)
from src.domain.entities.task_messages import (
DataContentEntity,
MessageAuthor,
MessageStyle,
TaskMessageContentType,
Expand Down Expand Up @@ -1900,6 +1902,213 @@ async def test_handle_event_send_error_no_task_specified(

assert "Either task_id or task_name must be provided" in str(exc_info.value)

async def test_handle_event_send_persists_user_message_with_clamped_created_at(
self,
agents_acp_use_case,
mock_http_gateway,
agent_repository,
task_service,
task_message_service,
sample_agent,
sample_text_content,
):
"""
AGX1-369: A user-authored event must materialize into a TaskMessage on
the platform side before the event is forwarded, with a created_at
strictly later than any already-existing message for that task.

This reproduces the same-millisecond race between an agent's startup
``data`` row (e.g. "Creating sandbox...") and the user's first
message: both can land at the same wall-clock millisecond, and BSON
Date is millisecond-precision so without an explicit clamp the
stored order is unspecified. The clamp guarantees the user message
sorts deterministically.
"""
# Given - Create agent and task first
await create_or_get_agent(agent_repository, sample_agent)
created_task = await task_service.create_task(
agent=sample_agent, task_name="event-ordering-task"
)

# Simulate the racing agent-authored startup message. We stamp it at
# *now* explicitly so the platform-side write of the user message
# would collide at the same millisecond if no clamp were applied.
race_instant = datetime.now(UTC)
sandbox_create = await task_message_service.append_message(
task_id=created_task.id,
content=DataContentEntity(
type=TaskMessageContentType.DATA,
author=MessageAuthor.AGENT,
style=MessageStyle.STATIC,
data={"event": "sandbox_create_started"},
),
created_at=race_instant,
)
assert sandbox_create.created_at == race_instant

# Mock ACP forward — we only care about the persistence path here.
async def mock_async_call(*args, **kwargs):
payload = kwargs.get("payload", {})
return {
"jsonrpc": "2.0",
"result": {"status": "event_sent", "event_id": "event-369"},
"id": payload.get("id", ""),
}

mock_http_gateway.async_call.side_effect = mock_async_call

# When
await agents_acp_use_case._handle_event_send(
agent=sample_agent,
params=SendEventRequest(
task_id=created_task.id,
content=sample_text_content, # author=USER fixture
),
acp_url=sample_agent.acp_url,
)

# Then - exactly two messages, in the expected order, and the user
# message's created_at is strictly greater than the agent message's.
messages_asc = await task_message_service.get_messages(
task_id=created_task.id,
limit=10,
page_number=1,
order_direction="asc",
)
assert len(messages_asc) == 2, (
f"Expected 1 pre-existing agent message + 1 platform-persisted "
f"user message, got {len(messages_asc)}"
)
assert messages_asc[0].id == sandbox_create.id
assert messages_asc[1].content.author == MessageAuthor.USER
assert messages_asc[1].content.content == sample_text_content.content
assert messages_asc[1].created_at is not None
assert messages_asc[0].created_at is not None
assert messages_asc[1].created_at > messages_asc[0].created_at, (
"User message must be timestamped strictly after every "
"pre-existing message so the (created_at, _id) sort key is "
"deterministic under BSON's millisecond precision."
)
# Defense-in-depth: enforce the >= +1ms gap the clamp promises.
assert (
messages_asc[1].created_at - messages_asc[0].created_at
>= timedelta(milliseconds=1)
)

async def test_handle_event_send_persists_user_message_when_task_is_empty(
self,
agents_acp_use_case,
mock_http_gateway,
agent_repository,
task_service,
task_message_service,
sample_agent,
sample_text_content,
):
"""
AGX1-369: When no prior messages exist for the task, the user
message is persisted with ``now`` and no clamp is needed — but the
message must still be visible in the list response before any
agent-emitted reply.
"""
await create_or_get_agent(agent_repository, sample_agent)
created_task = await task_service.create_task(
agent=sample_agent, task_name="event-ordering-task-empty"
)

async def mock_async_call(*args, **kwargs):
payload = kwargs.get("payload", {})
return {
"jsonrpc": "2.0",
"result": {"status": "event_sent", "event_id": "event-empty"},
"id": payload.get("id", ""),
}

mock_http_gateway.async_call.side_effect = mock_async_call

before = datetime.now(UTC)
await agents_acp_use_case._handle_event_send(
agent=sample_agent,
params=SendEventRequest(
task_id=created_task.id,
content=sample_text_content,
),
acp_url=sample_agent.acp_url,
)
after = datetime.now(UTC)

messages = await task_message_service.get_messages(
task_id=created_task.id,
limit=10,
page_number=1,
order_direction="asc",
)
assert len(messages) == 1
assert messages[0].content.author == MessageAuthor.USER
assert messages[0].created_at is not None
# Must be stamped with wall-clock at the time of the call (no clamp
# bump when there is nothing to clamp against).
assert before <= messages[0].created_at <= after

async def test_handle_event_send_skips_persistence_for_agent_authored_event(
self,
agents_acp_use_case,
mock_http_gateway,
agent_repository,
task_service,
task_message_service,
sample_agent,
):
"""
AGX1-369: Only user-authored events are materialized into a
TaskMessage on the platform. Agent- or system-authored event
payloads are still forwarded but must not produce a user-visible
message row on this path, to avoid duplicating the agent's own
writes.
"""
await create_or_get_agent(agent_repository, sample_agent)
created_task = await task_service.create_task(
agent=sample_agent, task_name="event-ordering-task-non-user"
)

agent_content = DataContentEntity(
type=TaskMessageContentType.DATA,
author=MessageAuthor.AGENT,
style=MessageStyle.STATIC,
data={"event": "system_signal"},
)

async def mock_async_call(*args, **kwargs):
payload = kwargs.get("payload", {})
return {
"jsonrpc": "2.0",
"result": {"status": "event_sent", "event_id": "event-sys"},
"id": payload.get("id", ""),
}

mock_http_gateway.async_call.side_effect = mock_async_call

await agents_acp_use_case._handle_event_send(
agent=sample_agent,
params=SendEventRequest(
task_id=created_task.id,
content=agent_content,
),
acp_url=sample_agent.acp_url,
)

messages = await task_message_service.get_messages(
task_id=created_task.id,
limit=10,
page_number=1,
order_direction="asc",
)
assert messages == [], (
"Agent-authored event payloads must not be persisted as "
"TaskMessages by the platform; the agent runtime owns those "
"writes."
)

async def test_handle_message_send_sync_with_task_name(
self,
agents_acp_use_case,
Expand Down
Loading