Skip to content

Commit a6bfc30

Browse files
declan-scaleclaude
andcommitted
fix(openai-agents): restore created_at deterministic timestamp (AGX1-378)
Thread the workflow-supplied created_at through UnifiedEmitter.auto_send_turn(turn, created_at=created_at) so the first agent message of the turn is stamped with the deterministic timestamp (e.g. workflow.now()) just as the original inline loop did before the unified-harness migration. The foundation (b4b8b33) wired auto_send_turn to accept and forward created_at to every streaming_task_message_context call. This commit connects the call site in run_agent_streamed_auto_send to that new parameter, restoring the behaviour that the migration comment documented as a known trade-off. Update the stale limitation comment to reflect the fix. Add test_run_agent_streamed_auto_send_forwards_created_at, which drives the activity through a fake stream with a pinned datetime and asserts every streaming context receives that datetime. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent fb887d1 commit a6bfc30

2 files changed

Lines changed: 83 additions & 13 deletions

File tree

src/agentex/lib/core/services/adk/providers/openai.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -742,18 +742,13 @@ async def run_agent_streamed_auto_send(
742742
) as span:
743743
heartbeat_if_in_workflow("run agent streamed auto send")
744744

745-
# created_at limitation: the unified auto_send path
746-
# (UnifiedEmitter.auto_send_turn -> auto_send) does not thread the
747-
# workflow-supplied created_at through to the per-message streaming
748-
# contexts it opens for the agent's turn. The previous inline loop
749-
# stamped the first message of the turn with workflow.now() to win
750-
# the race against the workflow's user-echo at the server; under the
751-
# unified surface that first agent message instead falls back to the
752-
# server's wall clock. This is an accepted trade-off for migrating
753-
# onto the shared harness; if strict first-message ordering becomes
754-
# necessary, auto_send must accept and dispense a created_at.
755-
# The dispenser is still used below for the guardrail-rejection
756-
# messages, which open their own streaming contexts directly.
745+
# AGX1-378 restored: created_at is now threaded through
746+
# UnifiedEmitter.auto_send_turn -> auto_send -> every
747+
# streaming_task_message_context call, so the first agent message of
748+
# the turn is stamped with the workflow-supplied timestamp (e.g.
749+
# workflow.now()) just as the original inline loop did.
750+
# The dispenser is still used below for guardrail-rejection messages,
751+
# which open their own streaming contexts directly.
757752
_take_created_at = _make_created_at_dispenser(created_at)
758753

759754
async with mcp_server_context(mcp_server_params, mcp_timeout_seconds) as servers:
@@ -817,7 +812,7 @@ async def run_agent_streamed_auto_send(
817812
)
818813

819814
try:
820-
await emitter.auto_send_turn(turn)
815+
await emitter.auto_send_turn(turn, created_at=created_at)
821816

822817
except InputGuardrailTripwireTriggered as e:
823818
# Handle guardrail trigger by sending a rejection message

tests/lib/adk/providers/test_openai_activities.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,48 @@ def _assert_tools_conversion(self, starting_agent, tools_case, _original_tools):
618618
else:
619619
raise ValueError(f"Unknown tools_case: {tools_case}")
620620

621+
@patch("agents.Runner.run_streamed")
622+
async def test_run_agent_streamed_auto_send_forwards_created_at(self, mock_runner_run_streamed):
623+
"""created_at is forwarded to every streaming context opened by auto_send_turn (AGX1-378)."""
624+
from datetime import datetime, timezone
625+
626+
from agentex.lib.core.temporal.activities.adk.providers.openai_activities import (
627+
RunAgentStreamedAutoSendParams,
628+
)
629+
630+
deterministic_ts = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc)
631+
632+
mock_streaming_result = self._create_streaming_result_mock()
633+
634+
async def _no_events():
635+
return
636+
yield # make it an async generator
637+
638+
mock_streaming_result.stream_events = _no_events
639+
mock_runner_run_streamed.return_value = mock_streaming_result
640+
641+
mock_tracer = self._create_mock_tracer()
642+
openai_service, openai_activities, env = self._create_test_setup(mock_tracer)
643+
mock_ctx, recorded_created_ats = self._setup_streaming_service_mocks_with_created_at(openai_service)
644+
645+
params = RunAgentStreamedAutoSendParams(
646+
input_list=[{"role": "user", "content": "hello"}],
647+
mcp_server_params=[],
648+
agent_name="test_agent",
649+
agent_instructions="You are a helpful assistant",
650+
trace_id="test-trace-id",
651+
parent_span_id="test-span-id",
652+
task_id="test-task-id",
653+
created_at=deterministic_ts,
654+
)
655+
656+
await env.run(openai_activities.run_agent_streamed_auto_send, params)
657+
658+
assert all(ts == deterministic_ts for ts in recorded_created_ats), (
659+
f"Expected all streaming contexts to receive created_at={deterministic_ts!r}, "
660+
f"got: {recorded_created_ats!r}"
661+
)
662+
621663
def _setup_streaming_service_mocks(self, openai_service):
622664
"""Helper method to setup streaming service mocks for run_agent_auto_send."""
623665
from unittest.mock import AsyncMock
@@ -665,6 +707,39 @@ async def mock_streaming_context_manager(*_args, **kwargs):
665707

666708
return mock_streaming_context
667709

710+
def _setup_streaming_service_mocks_with_created_at(self, openai_service):
711+
"""Like _setup_streaming_service_mocks but also records every created_at kwarg."""
712+
from contextlib import asynccontextmanager
713+
from unittest.mock import AsyncMock
714+
715+
from agentex.types.task_message import TaskMessage
716+
717+
mock_streaming_service = AsyncMock()
718+
mock_agentex_client = AsyncMock()
719+
720+
mock_streaming_context = AsyncMock()
721+
mock_task_message = Mock(spec=TaskMessage)
722+
mock_task_message.id = "test-task-message-id"
723+
mock_task_message.task_id = "test-task-id"
724+
mock_task_message.content = {"type": "text", "content": "test"}
725+
mock_streaming_context.task_message = mock_task_message
726+
mock_streaming_context.stream_update = AsyncMock()
727+
728+
recorded_created_ats: list = []
729+
730+
@asynccontextmanager
731+
async def mock_ctx_manager(*_args, **kwargs):
732+
recorded_created_ats.append(kwargs.get("created_at"))
733+
yield mock_streaming_context
734+
735+
mock_streaming_service.streaming_task_message_context = mock_ctx_manager
736+
mock_streaming_context.opened_contents = []
737+
738+
openai_service.streaming_service = mock_streaming_service
739+
openai_service.agentex_client = mock_agentex_client
740+
741+
return mock_streaming_context, recorded_created_ats
742+
668743
def _create_code_interpreter_tool_call_mock(self, call_id="code_interpreter_call_123"):
669744
"""Helper to create ResponseCodeInterpreterToolCall mock objects."""
670745
return ResponseCodeInterpreterToolCall(

0 commit comments

Comments
 (0)