diff --git a/agentex/src/domain/use_cases/agents_acp_use_case.py b/agentex/src/domain/use_cases/agents_acp_use_case.py index bc99111a..3eee137b 100644 --- a/agentex/src/domain/use_cases/agents_acp_use_case.py +++ b/agentex/src/domain/use_cases/agents_acp_use_case.py @@ -2,6 +2,7 @@ import json import random from collections.abc import AsyncIterator, Callable +from datetime import UTC, datetime, timedelta from typing import Annotated, Any from fastapi import Depends @@ -832,6 +833,29 @@ async def _handle_event_send( task = await self.task_service.get_task( id=params.task_id, name=params.task_name ) + + # Persist user-authored event payloads as a TaskMessage on the + # platform side BEFORE forwarding the event to the agent. Without + # this, the agent runtime is the one that materializes the user's + # TaskMessage when it processes the forwarded event, which races + # against any messages the agent emits from its task-create handler + # (e.g. a startup "creating sandbox..." data row). BSON Date is + # millisecond-precision, so two writes inside the same millisecond + # have no defined order — and even when the writes happen in order, + # the agent's startup row can land first if event_send takes any + # round-trip time to reach the agent. Writing here closes both + # windows: the user row exists in the DB before the agent gets the + # event, and its created_at is clamped strictly after every + # already-existing message so the row sorts deterministically when + # paginated by (created_at, _id). + if ( + params.content is not None + and params.content.author == MessageAuthor.USER + ): + await self._append_user_event_message( + task_id=task.id, content=params.content + ) + # Create the event in the DB event_entity = await self.task_service.create_event_and_forward_to_acp( agent=agent, @@ -842,5 +866,41 @@ async def _handle_event_send( ) return event_entity + async def _append_user_event_message( + self, + task_id: str, + content: TaskMessageContentEntity, + ) -> TaskMessageEntity: + """ + Persist a user-authored event payload as a TaskMessage with a + created_at that is strictly greater than every existing message's + created_at for this task. + + The clamp is `max(datetime.now(UTC), latest_existing + 1ms)`. The + 1ms bump matches the staggering used by ``append_messages`` so the + stored ordering is durable against BSON Date's millisecond + precision: two writes that would otherwise collide on the same + millisecond now sort deterministically. + """ + latest = await self.task_message_service.get_messages( + task_id=task_id, + limit=1, + page_number=1, + order_by="created_at", + order_direction="desc", + ) + now = datetime.now(UTC) + if latest and latest[0].created_at is not None: + candidate = latest[0].created_at + timedelta(milliseconds=1) + clamped_created_at = max(now, candidate) + else: + clamped_created_at = now + + return await self.task_message_service.append_message( + task_id=task_id, + content=content, + created_at=clamped_created_at, + ) + DAgentsACPUseCase = Annotated[AgentsACPUseCase, Depends(AgentsACPUseCase)] diff --git a/agentex/tests/unit/use_cases/test_agents_acp_use_case.py b/agentex/tests/unit/use_cases/test_agents_acp_use_case.py index abb7af0b..d92bf136 100644 --- a/agentex/tests/unit/use_cases/test_agents_acp_use_case.py +++ b/agentex/tests/unit/use_cases/test_agents_acp_use_case.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta from unittest.mock import AsyncMock from uuid import uuid4 from zoneinfo import ZoneInfo @@ -19,6 +20,7 @@ StreamTaskMessageStartEntity, ) from src.domain.entities.task_messages import ( + DataContentEntity, MessageAuthor, MessageStyle, TaskMessageContentType, @@ -1900,6 +1902,213 @@ async def test_handle_event_send_error_no_task_specified( assert "Either task_id or task_name must be provided" in str(exc_info.value) + async def test_handle_event_send_persists_user_message_with_clamped_created_at( + self, + agents_acp_use_case, + mock_http_gateway, + agent_repository, + task_service, + task_message_service, + sample_agent, + sample_text_content, + ): + """ + AGX1-369: A user-authored event must materialize into a TaskMessage on + the platform side before the event is forwarded, with a created_at + strictly later than any already-existing message for that task. + + This reproduces the same-millisecond race between an agent's startup + ``data`` row (e.g. "Creating sandbox...") and the user's first + message: both can land at the same wall-clock millisecond, and BSON + Date is millisecond-precision so without an explicit clamp the + stored order is unspecified. The clamp guarantees the user message + sorts deterministically. + """ + # Given - Create agent and task first + await create_or_get_agent(agent_repository, sample_agent) + created_task = await task_service.create_task( + agent=sample_agent, task_name="event-ordering-task" + ) + + # Simulate the racing agent-authored startup message. We stamp it at + # *now* explicitly so the platform-side write of the user message + # would collide at the same millisecond if no clamp were applied. + race_instant = datetime.now(UTC) + sandbox_create = await task_message_service.append_message( + task_id=created_task.id, + content=DataContentEntity( + type=TaskMessageContentType.DATA, + author=MessageAuthor.AGENT, + style=MessageStyle.STATIC, + data={"event": "sandbox_create_started"}, + ), + created_at=race_instant, + ) + assert sandbox_create.created_at == race_instant + + # Mock ACP forward — we only care about the persistence path here. + async def mock_async_call(*args, **kwargs): + payload = kwargs.get("payload", {}) + return { + "jsonrpc": "2.0", + "result": {"status": "event_sent", "event_id": "event-369"}, + "id": payload.get("id", ""), + } + + mock_http_gateway.async_call.side_effect = mock_async_call + + # When + await agents_acp_use_case._handle_event_send( + agent=sample_agent, + params=SendEventRequest( + task_id=created_task.id, + content=sample_text_content, # author=USER fixture + ), + acp_url=sample_agent.acp_url, + ) + + # Then - exactly two messages, in the expected order, and the user + # message's created_at is strictly greater than the agent message's. + messages_asc = await task_message_service.get_messages( + task_id=created_task.id, + limit=10, + page_number=1, + order_direction="asc", + ) + assert len(messages_asc) == 2, ( + f"Expected 1 pre-existing agent message + 1 platform-persisted " + f"user message, got {len(messages_asc)}" + ) + assert messages_asc[0].id == sandbox_create.id + assert messages_asc[1].content.author == MessageAuthor.USER + assert messages_asc[1].content.content == sample_text_content.content + assert messages_asc[1].created_at is not None + assert messages_asc[0].created_at is not None + assert messages_asc[1].created_at > messages_asc[0].created_at, ( + "User message must be timestamped strictly after every " + "pre-existing message so the (created_at, _id) sort key is " + "deterministic under BSON's millisecond precision." + ) + # Defense-in-depth: enforce the >= +1ms gap the clamp promises. + assert ( + messages_asc[1].created_at - messages_asc[0].created_at + >= timedelta(milliseconds=1) + ) + + async def test_handle_event_send_persists_user_message_when_task_is_empty( + self, + agents_acp_use_case, + mock_http_gateway, + agent_repository, + task_service, + task_message_service, + sample_agent, + sample_text_content, + ): + """ + AGX1-369: When no prior messages exist for the task, the user + message is persisted with ``now`` and no clamp is needed — but the + message must still be visible in the list response before any + agent-emitted reply. + """ + await create_or_get_agent(agent_repository, sample_agent) + created_task = await task_service.create_task( + agent=sample_agent, task_name="event-ordering-task-empty" + ) + + async def mock_async_call(*args, **kwargs): + payload = kwargs.get("payload", {}) + return { + "jsonrpc": "2.0", + "result": {"status": "event_sent", "event_id": "event-empty"}, + "id": payload.get("id", ""), + } + + mock_http_gateway.async_call.side_effect = mock_async_call + + before = datetime.now(UTC) + await agents_acp_use_case._handle_event_send( + agent=sample_agent, + params=SendEventRequest( + task_id=created_task.id, + content=sample_text_content, + ), + acp_url=sample_agent.acp_url, + ) + after = datetime.now(UTC) + + messages = await task_message_service.get_messages( + task_id=created_task.id, + limit=10, + page_number=1, + order_direction="asc", + ) + assert len(messages) == 1 + assert messages[0].content.author == MessageAuthor.USER + assert messages[0].created_at is not None + # Must be stamped with wall-clock at the time of the call (no clamp + # bump when there is nothing to clamp against). + assert before <= messages[0].created_at <= after + + async def test_handle_event_send_skips_persistence_for_agent_authored_event( + self, + agents_acp_use_case, + mock_http_gateway, + agent_repository, + task_service, + task_message_service, + sample_agent, + ): + """ + AGX1-369: Only user-authored events are materialized into a + TaskMessage on the platform. Agent- or system-authored event + payloads are still forwarded but must not produce a user-visible + message row on this path, to avoid duplicating the agent's own + writes. + """ + await create_or_get_agent(agent_repository, sample_agent) + created_task = await task_service.create_task( + agent=sample_agent, task_name="event-ordering-task-non-user" + ) + + agent_content = DataContentEntity( + type=TaskMessageContentType.DATA, + author=MessageAuthor.AGENT, + style=MessageStyle.STATIC, + data={"event": "system_signal"}, + ) + + async def mock_async_call(*args, **kwargs): + payload = kwargs.get("payload", {}) + return { + "jsonrpc": "2.0", + "result": {"status": "event_sent", "event_id": "event-sys"}, + "id": payload.get("id", ""), + } + + mock_http_gateway.async_call.side_effect = mock_async_call + + await agents_acp_use_case._handle_event_send( + agent=sample_agent, + params=SendEventRequest( + task_id=created_task.id, + content=agent_content, + ), + acp_url=sample_agent.acp_url, + ) + + messages = await task_message_service.get_messages( + task_id=created_task.id, + limit=10, + page_number=1, + order_direction="asc", + ) + assert messages == [], ( + "Agent-authored event payloads must not be persisted as " + "TaskMessages by the platform; the agent runtime owns those " + "writes." + ) + async def test_handle_message_send_sync_with_task_name( self, agents_acp_use_case,