Skip to content

fix(agents-acp): persist user event message before forwarding to ACP#333

Open
declan-scale wants to merge 1 commit into
mainfrom
declan-scale/AGX1-369-persist-user-event-message-before-forward
Open

fix(agents-acp): persist user event message before forwarding to ACP#333
declan-scale wants to merge 1 commit into
mainfrom
declan-scale/AGX1-369-persist-user-event-message-before-forward

Conversation

@declan-scale

@declan-scale declan-scale commented Jun 23, 2026

Copy link
Copy Markdown
Collaborator

Problem

In the custom-agents UI, the first turn of a brand-new task sometimes
renders an agent-authored message above the user's first message. The
expected order is user-input first, then everything the agent does in
response to it.

For agentic / async agents, the user's first input arrives as an
event/send RPC. Today the agent runtime is the one that materializes
that input into a TaskMessage after it processes the forwarded event.
That write races against any messages the agent emits from its
task-create handler (e.g. a startup "creating sandbox..." data row):

  • BSON Date is millisecond-precision, so two writes in the same
    millisecond have no defined order.
  • Even when the writes happen in the "right" order, the agent's startup
    row can be timestamped earlier than the user row because the event has
    to round-trip from the platform to the agent before the agent persists
    the user message.

The UI sorts by (created_at, _id) and renders strictly in that order,
so the agent visibly speaks before the user.

Fix

In _handle_event_send, when the event payload is user-authored,
persist it as a TaskMessage on the platform side before forwarding
the event to ACP, with created_at clamped to:

max(datetime.now(UTC), latest_existing_message.created_at + 1ms)

The 1ms bump matches the staggering used by append_messages so the
stored ordering is durable against BSON's millisecond precision, and
the synchronous platform-side write closes the round-trip window — the
user row is in the DB before the agent ever sees the event.

Only user-authored payloads are persisted on this path. Agent- /
system-authored event payloads still flow through create_event_and_forward_to_acp
without producing a platform-side message row, so the agent runtime
remains the owner of agent message writes.

Tests

Added three unit tests under TestAgentsACPUseCase exercising the new
behavior against real Postgres + MongoDB testcontainers:

  • test_handle_event_send_persists_user_message_with_clamped_created_at:
    pre-seeds an agent-authored "sandbox create" row stamped at now,
    then sends a user-authored event at the same instant, and asserts the
    user row is timestamped strictly after the agent row by at least 1ms.
  • test_handle_event_send_persists_user_message_when_task_is_empty:
    no pre-existing messages — the user message is stamped with
    wall-clock, no clamp bump needed, and is the only message present.
  • test_handle_event_send_skips_persistence_for_agent_authored_event:
    an author=AGENT event payload is forwarded normally but does NOT
    produce a platform-side TaskMessage.

Out of scope / follow-ups

  • Per-task monotonic sequence number for messages with a dual sort key
    (schema change + backfill).
  • Resumable SSE via Last-Event-ID + removing the cache-skip on
    reconnect (separate ticket — fixes a sibling reconciliation bug).
  • Single reconciliation function in the UI to replace the ad-hoc merges
    in use-task-messages / use-task-subscription.
  • Server-side guarantee that streaming_status transitions to a
    terminal state on worker death / cancel.

Validation

  • Self-reviewed staged diff.
  • Did not run lint / typecheck / full test suite locally — relying on
    CI to enforce.

Test plan for reviewer

  • CI passes (ruff, mypy, unit + integration).
  • Manually create 20+ new tasks against an agentic agent that emits
    a startup data row and confirm the user message is always rendered
    first.
  • Confirm existing pagination, infinite scroll up, and tool-pair
    grouping behavior is unchanged for non-first-turn cases.
  • Confirm agent runtimes that previously persisted their own user
    message do not produce duplicate user rows (default AsyncBaseACP
    template does not; custom handlers should be audited).

Greptile Summary

Fixes a race condition in the custom-agents UI where an agent's startup message could appear before the user's first message by persisting the user-authored event payload as a TaskMessage on the platform side — with a max(now, latest + 1ms) clamped timestamp — before forwarding the event to ACP.

  • _handle_event_send now calls a new _append_user_event_message helper for author=USER payloads only; agent/system payloads are unchanged and still flow through create_event_and_forward_to_acp without producing a platform-side row.
  • Three new integration tests using real testcontainers verify the clamped timestamp, the empty-task case, and that agent-authored events are not persisted by the platform.

Confidence Score: 4/5

Safe to merge with awareness of the partial-failure scenario introduced on the new persistence path.

The core ordering fix is sound and well-tested. However, the new MongoDB write in _handle_event_send is not atomic with the subsequent Postgres event creation and ACP forward — if either of those steps fails, the user message row is durably written with no event sent to the agent, and a client retry will write a second user message. This is a real regression in the error path that was clean before this change.

The _handle_event_send method in agents_acp_use_case.py warrants a second look around error handling for the new MongoDB write step.

Important Files Changed

Filename Overview
agentex/src/domain/use_cases/agents_acp_use_case.py Adds platform-side user message persistence in _handle_event_send before ACP forwarding; introduces a partial-failure window where the MongoDB write succeeds but the subsequent ACP forward can fail, leaving an orphaned user message that duplicates on retry.
agentex/tests/unit/use_cases/test_agents_acp_use_case.py Adds three integration-style tests covering the clamped timestamp, empty-task, and agent-authored event cases; tests use real Postgres + MongoDB testcontainers and correctly assert ordering invariants.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Client
    participant Platform as _handle_event_send
    participant MongoDB
    participant Postgres
    participant ACP as Agent (ACP)

    Client->>Platform: "event/send (author=USER)"
    Platform->>MongoDB: _append_user_event_message (created_at clamped)
    MongoDB-->>Platform: TaskMessage persisted
    Platform->>Postgres: create_event_and_forward_to_acp
    Postgres-->>Platform: EventEntity created
    Platform->>ACP: send_event
    ACP-->>Platform: ack
    Platform-->>Client: EventEntity

    note over MongoDB,Postgres: If ACP forward fails, MongoDB row is orphaned and a client retry creates a duplicate user message
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Client
    participant Platform as _handle_event_send
    participant MongoDB
    participant Postgres
    participant ACP as Agent (ACP)

    Client->>Platform: "event/send (author=USER)"
    Platform->>MongoDB: _append_user_event_message (created_at clamped)
    MongoDB-->>Platform: TaskMessage persisted
    Platform->>Postgres: create_event_and_forward_to_acp
    Postgres-->>Platform: EventEntity created
    Platform->>ACP: send_event
    ACP-->>Platform: ack
    Platform-->>Client: EventEntity

    note over MongoDB,Postgres: If ACP forward fails, MongoDB row is orphaned and a client retry creates a duplicate user message
Loading

Reviews (2): Last reviewed commit: "fix(agents-acp): persist user event mess..." | Re-trigger Greptile

When a brand-new task receives its first event/send, the agent runtime
was the only thing materializing the user's TaskMessage — which races
against any messages the agent emits from its task-create handler (e.g.
a startup "creating sandbox..." data row). BSON Date is millisecond-
precision, so the two writes can land in the same millisecond with no
defined order, and even ordered writes can place the agent's startup row
first if event_send takes round-trip time to reach the agent. The
user-visible symptom is the agent appearing to speak before the user.

Write the user-authored event payload as a TaskMessage on the platform
side before forwarding to ACP, with created_at clamped to
max(now, latest_existing_created_at + 1ms). The 1ms bump matches the
staggering used by append_messages so the stored ordering is durable
under BSON's millisecond precision.

Only user-authored event payloads are persisted on this path; agent- and
system-authored event payloads still flow through as events without
producing a platform-side message row, so the agent runtime remains the
owner of agent message writes.
@declan-scale declan-scale requested a review from a team as a code owner June 23, 2026 00:24
@danielmillerp

Copy link
Copy Markdown
Collaborator

hmm isnt this by design tho? we want to be able to modify the users messages? or nah is that like a configurable thing that wasn't worth it?

Comment on lines +885 to +899
latest = await self.task_message_service.get_messages(
task_id=task_id,
limit=1,
page_number=1,
order_by="created_at",
order_direction="desc",
)
now = datetime.now(UTC)
if latest and latest[0].created_at is not None:
candidate = latest[0].created_at + timedelta(milliseconds=1)
clamped_created_at = max(now, candidate)
else:
clamped_created_at = now

return await self.task_message_service.append_message(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 TOCTOU window in timestamp clamp

There is a read-then-write gap between get_messages (which fetches the latest created_at) and append_message (which writes with the clamped timestamp). If the agent emits another startup message between those two awaits, it will land with a wall-clock timestamp that is ≥ latest[0].created_at + 1ms, and the platform-written user message may end up with the same or an earlier millisecond than that new agent message — reproducing the original ordering ambiguity within the narrow window. The PR description already defers a complete fix (per-task monotonic sequence numbers) as a follow-up, so this is a known limitation, but it's worth making explicit so future work can close the gap.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/use_cases/agents_acp_use_case.py
Line: 885-899

Comment:
**TOCTOU window in timestamp clamp**

There is a read-then-write gap between `get_messages` (which fetches the latest `created_at`) and `append_message` (which writes with the clamped timestamp). If the agent emits another startup message between those two awaits, it will land with a wall-clock timestamp that is ≥ `latest[0].created_at + 1ms`, and the platform-written user message may end up with the same or an earlier millisecond than that new agent message — reproducing the original ordering ambiguity within the narrow window. The PR description already defers a complete fix (per-task monotonic sequence numbers) as a follow-up, so this is a known limitation, but it's worth making explicit so future work can close the gap.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code

@declan-scale declan-scale reopened this Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants