From 61c14e3c33970abfa442f7cc556c7060629d1a83 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 10:00:42 -0400 Subject: [PATCH 1/2] refactor(harness): move the OpenAI harness into adk/_modules + export at facade Promote the OpenAI Turn and sync tap to the canonical adk/_modules layout (_openai_turn.py, _openai_sync.py), matching every other harness. Leave back-compat shims at providers/_modules/{openai_turn,sync_provider}.py so existing import paths keep working; the larger Temporal/MCP provider stays under adk.providers. Export OpenAITurn, openai_usage_to_turn_usage and convert_openai_to_agentex_events from agentex.lib.adk so the OpenAI harness is a first-class facade citizen like ClaudeCodeTurn / CodexTurn (fixes Greptile follow-up: templates can import from the package facade instead of a private module path). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/adk/__init__.py | 6 + src/agentex/lib/adk/_modules/_openai_sync.py | 371 ++++++++++++++++++ src/agentex/lib/adk/_modules/_openai_turn.py | 134 +++++++ .../lib/adk/providers/_modules/openai_turn.py | 138 +------ .../adk/providers/_modules/sync_provider.py | 363 +---------------- .../lib/core/services/adk/providers/openai.py | 9 +- .../adk/providers/test_openai_activities.py | 2 +- tests/lib/adk/providers/test_openai_turn.py | 4 +- 8 files changed, 534 insertions(+), 493 deletions(-) create mode 100644 src/agentex/lib/adk/_modules/_openai_sync.py create mode 100644 src/agentex/lib/adk/_modules/_openai_turn.py diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index 23c81690a..4d79be9dd 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -13,6 +13,8 @@ ) from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn, stream_pydantic_ai_events from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events +from agentex.lib.adk._modules._openai_sync import convert_openai_to_agentex_events +from agentex.lib.adk._modules._openai_turn import OpenAITurn, openai_usage_to_turn_usage from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events from agentex.lib.adk._modules._claude_code_turn import ( ClaudeCodeTurn, @@ -74,6 +76,10 @@ "stream_pydantic_ai_events", "convert_pydantic_ai_to_agentex_events", "PydanticAITurn", + # OpenAI Agents + "convert_openai_to_agentex_events", + "OpenAITurn", + "openai_usage_to_turn_usage", # Claude Code "convert_claude_code_to_agentex_events", "ClaudeCodeTurn", diff --git a/src/agentex/lib/adk/_modules/_openai_sync.py b/src/agentex/lib/adk/_modules/_openai_sync.py new file mode 100644 index 000000000..f357b881b --- /dev/null +++ b/src/agentex/lib/adk/_modules/_openai_sync.py @@ -0,0 +1,371 @@ +"""Sync OpenAI Agents SDK streaming tap for Agentex. + +Converts an OpenAI Agents SDK streamed run (``Runner.run_streamed(...)`` +``stream_events()``) into Agentex ``StreamTaskMessage*`` events, including +reasoning content and reasoning summary deltas for reasoning models (o1/o3/gpt-5). + +This is the lower-level primitive used by ``OpenAITurn`` (in +``_openai_turn.py``). New OpenAI Agents integrations should prefer wrapping a +``Runner.run_streamed`` result in ``OpenAITurn`` and driving delivery + tracing +through ``UnifiedEmitter``. +""" + +from __future__ import annotations + +from typing import Any + +from openai.types.responses import ( + ResponseTextDeltaEvent, + ResponseFunctionToolCall, + ResponseFunctionWebSearch, + ResponseOutputItemDoneEvent, + ResponseOutputItemAddedEvent, + ResponseCodeInterpreterToolCall, + ResponseReasoningSummaryPartAddedEvent, + ResponseReasoningSummaryTextDeltaEvent, +) +from openai.types.responses.response_reasoning_text_done_event import ResponseReasoningTextDoneEvent +from openai.types.responses.response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent +from openai.types.responses.response_reasoning_summary_text_done_event import ResponseReasoningSummaryTextDoneEvent + +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.task_message_content import TextContent +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta + + +def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, Any]]: + """ + Extract call_id, tool_name, and tool_arguments from a tool call item. + Args: + tool_call_item: The tool call item to process + Returns: + A tuple of (call_id, tool_name, tool_arguments) + """ + # Generic handling for different tool call types + # Try 'call_id' first, then 'id', then generate placeholder + if hasattr(tool_call_item, "call_id"): + call_id = tool_call_item.call_id + elif hasattr(tool_call_item, "id"): + call_id = tool_call_item.id + else: + call_id = f"unknown_call_{id(tool_call_item)}" + + if isinstance(tool_call_item, ResponseFunctionWebSearch): + tool_name = "web_search" + tool_arguments = {"action": tool_call_item.action.model_dump(), "status": tool_call_item.status} + elif isinstance(tool_call_item, ResponseCodeInterpreterToolCall): + tool_name = "code_interpreter" + tool_arguments = {"code": tool_call_item.code, "status": tool_call_item.status} + elif isinstance(tool_call_item, ResponseFunctionToolCall): + # Handle standard function tool calls + tool_name = tool_call_item.name + # Handle the arguments field which might be a string or None + if tool_call_item.arguments: + if isinstance(tool_call_item.arguments, str): + import json + + tool_arguments = json.loads(tool_call_item.arguments) if tool_call_item.arguments else {} + else: + tool_arguments = tool_call_item.arguments + else: + tool_arguments = {} + else: + # Generic handling for any tool call type + tool_name = getattr(tool_call_item, "name", type(tool_call_item).__name__) + # Handle the arguments field which might be a string or None + if hasattr(tool_call_item, "arguments"): + arguments = tool_call_item.arguments + if isinstance(arguments, str): + import json + + tool_arguments = json.loads(arguments) if arguments else {} + elif arguments is None: + tool_arguments = {} + else: + tool_arguments = arguments + else: + tool_arguments = tool_call_item.model_dump() + + return call_id, tool_name, tool_arguments + + +def _extract_tool_response_info(tool_map: dict[str, Any], tool_output_item: Any) -> tuple[str, str, str]: + """ + Extract call_id, tool_name, and content from a tool output item. + Args: + tool_map: Dictionary mapping call_ids to tool names + tool_output_item: The tool output item to process + Returns: + A tuple of (call_id, tool_name, content) + """ + + # Handle different formats of tool_output_item + if isinstance(tool_output_item, dict): + call_id = tool_output_item.get("call_id", tool_output_item.get("id", f"unknown_call_{id(tool_output_item)}")) + content = tool_output_item.get("output", str(tool_output_item)) + else: + # Try to get call_id from attributes + if hasattr(tool_output_item, "call_id"): + call_id = tool_output_item.call_id + elif hasattr(tool_output_item, "id"): + call_id = tool_output_item.id + else: + call_id = f"unknown_call_{id(tool_output_item)}" + + # Get content + if hasattr(tool_output_item, "output"): + content = tool_output_item.output + else: + content = str(tool_output_item) + + # Get tool name from map + tool_name = tool_map.get(call_id, "unknown_tool") + + return call_id, tool_name, content + + +async def convert_openai_to_agentex_events(stream_response): + """Convert OpenAI streaming events to AgentEx TaskMessageUpdate events with reasoning support. + + This is an enhanced version of the base converter that includes support for: + - Reasoning content deltas (for o1 models) + - Reasoning summary deltas (for o1 models) + + Args: + stream_response: An async iterator of OpenAI streaming events + Yields: + TaskMessageUpdate: AgentEx streaming events (StreamTaskMessageDelta, StreamTaskMessageFull, or StreamTaskMessageDone) + """ + + tool_map = {} + event_count = 0 + message_index = 0 # Track message index for proper sequencing + seen_tool_output = False # Track if we've seen tool output to know when final text starts + item_id_to_index = {} # Map item_id to message index + item_id_to_type = {} # Map item_id to content type (text, reasoning_content, reasoning_summary) + + async for event in stream_response: + event_count += 1 + + # Check for raw response events which contain the actual OpenAI streaming events + if hasattr(event, "type") and event.type == "raw_response_event": + if hasattr(event, "data"): + raw_event = event.data + + # Check for ResponseOutputItemAddedEvent which signals a new message starting + if isinstance(raw_event, ResponseOutputItemAddedEvent): + # Don't increment here - we'll increment when we see the actual text delta + # This is just a signal that a new message is starting + pass + + # Handle item completion - send done event to close the message + elif isinstance(raw_event, ResponseOutputItemDoneEvent): + item_id = raw_event.item.id + if item_id in item_id_to_index: + # Get the message type to decide whether to send done event + message_type = item_id_to_type.get(item_id, "text") + + # Don't send done events for reasoning content/summary + # They just end with their last delta + if message_type not in ("reasoning_content", "reasoning_summary"): + yield StreamTaskMessageDone( + type="done", + index=item_id_to_index[item_id], + ) + + # Skip reasoning summary part added events - we handle them on delta + elif isinstance(raw_event, ResponseReasoningSummaryPartAddedEvent): + pass + + # Handle reasoning summary text delta events + elif isinstance(raw_event, ResponseReasoningSummaryTextDeltaEvent): + item_id = raw_event.item_id + summary_index = raw_event.summary_index + + # If this is a new item_id we haven't seen, create a new message + if item_id and item_id not in item_id_to_index: + message_index += 1 + item_id_to_index[item_id] = message_index + item_id_to_type[item_id] = "reasoning_summary" + + # Send a start event for this new reasoning summary message. + # The start content must be ReasoningContent (not TextContent) + # so consumers that branch on the start event's content type + # render a reasoning/thinking indicator; the final persisted + # content is rebuilt from the reasoning deltas regardless. + yield StreamTaskMessageStart( + type="start", + index=item_id_to_index[item_id], + content=ReasoningContent( + type="reasoning", + author="agent", + summary=[], + content=[], + style="active", + ), + ) + + # Use the index for this item_id + current_index = item_id_to_index.get(item_id, message_index) + + # Yield reasoning summary delta + yield StreamTaskMessageDelta( + type="delta", + index=current_index, + delta=ReasoningSummaryDelta( + type="reasoning_summary", + summary_index=summary_index, + summary_delta=raw_event.delta, + ), + ) + + # Handle reasoning summary text done events + elif isinstance(raw_event, ResponseReasoningSummaryTextDoneEvent): + # We do NOT close the streaming context here + # as there can be multiple reasoning summaries. + # The context will be closed when the entire + # output item is done (ResponseOutputItemDoneEvent) + pass + + # Handle reasoning content text delta events + elif isinstance(raw_event, ResponseReasoningTextDeltaEvent): + item_id = raw_event.item_id + content_index = raw_event.content_index + + # If this is a new item_id we haven't seen, create a new message + if item_id and item_id not in item_id_to_index: + message_index += 1 + item_id_to_index[item_id] = message_index + item_id_to_type[item_id] = "reasoning_content" + + # Send a start event for this new reasoning content message. + # The start content must be ReasoningContent (not TextContent) + # so consumers that branch on the start event's content type + # render a reasoning/thinking indicator; the final persisted + # content is rebuilt from the reasoning deltas regardless. + yield StreamTaskMessageStart( + type="start", + index=item_id_to_index[item_id], + content=ReasoningContent( + type="reasoning", + author="agent", + summary=[], + content=[], + style="active", + ), + ) + + # Use the index for this item_id + current_index = item_id_to_index.get(item_id, message_index) + + # Yield reasoning content delta + yield StreamTaskMessageDelta( + type="delta", + index=current_index, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=content_index, + content_delta=raw_event.delta, + ), + ) + + # Handle reasoning content text done events + elif isinstance(raw_event, ResponseReasoningTextDoneEvent): + # We do NOT close the streaming context here + # as there can be multiple reasoning content texts. + # The context will be closed when the entire + # output item is done (ResponseOutputItemDoneEvent) + pass + + # Check if this is a text delta event from OpenAI + elif isinstance(raw_event, ResponseTextDeltaEvent): + # Check if this event has an item_id + item_id = getattr(raw_event, "item_id", None) + + # If this is a new item_id we haven't seen, it's a new message + if item_id and item_id not in item_id_to_index: + # Check if this is truly a NEW text message after tools + # We need to differentiate between the first text and the final text after tools + if seen_tool_output: + # This is the final text message after tool execution + message_index += 1 + item_id_to_index[item_id] = message_index + else: + item_id_to_index[item_id] = message_index + + item_id_to_type[item_id] = "text" + + # Send a start event with empty content for this new text message + yield StreamTaskMessageStart( + type="start", + index=item_id_to_index[item_id], + content=TextContent( + type="text", + author="agent", + content="", # Start with empty content, deltas will fill it + ), + ) + + # Use the index for this item_id + current_index = item_id_to_index.get(item_id, message_index) + + delta_message = StreamTaskMessageDelta( + type="delta", + index=current_index, + delta=TextDelta( + type="text", + text_delta=raw_event.delta, + ), + ) + yield delta_message + + elif hasattr(event, "type") and event.type == "run_item_stream_event": + # Skip reasoning_item events - they're handled via raw_response_event above + if hasattr(event, "item") and event.item.type == "reasoning_item": + continue + + # Check for tool_call_item type (this is when a tool is being called) + elif hasattr(event, "item") and event.item.type == "tool_call_item": + # Extract tool call information using the helper method + call_id, tool_name, tool_arguments = _extract_tool_call_info(event.item.raw_item) + tool_map[call_id] = tool_name + tool_request_content = ToolRequestContent( + tool_call_id=call_id, + name=tool_name, + arguments=tool_arguments, + author="agent", + ) + message_index += 1 # Increment for new message + yield StreamTaskMessageFull( + index=message_index, + type="full", + content=tool_request_content, + ) + + # Check for tool_call_output_item type (this is when a tool returns output) + elif hasattr(event, "item") and event.item.type == "tool_call_output_item": + # Extract tool response information using the helper method + call_id, tool_name, content = _extract_tool_response_info(tool_map, event.item.raw_item) + tool_response_content = ToolResponseContent( + tool_call_id=call_id, + name=tool_name, + content=content, + author="agent", + ) + message_index += 1 # Increment for new message + seen_tool_output = True # Mark that we've seen tool output so next text gets new index + yield StreamTaskMessageFull( + type="full", + index=message_index, + content=tool_response_content, + ) diff --git a/src/agentex/lib/adk/_modules/_openai_turn.py b/src/agentex/lib/adk/_modules/_openai_turn.py new file mode 100644 index 000000000..cfb1ce22d --- /dev/null +++ b/src/agentex/lib/adk/_modules/_openai_turn.py @@ -0,0 +1,134 @@ +"""OpenAITurn: adapt an OpenAI Agents SDK streamed run onto the harness surface. + +A ``HarnessTurn`` exposes a single canonical ``StreamTaskMessage*`` stream plus +normalized usage. ``OpenAITurn`` wraps a ``RunResultStreaming`` (from +``Runner.run_streamed``), converts its native OpenAI events into the canonical +stream via ``convert_openai_to_agentex_events``, and after exhaustion reads the +run's ``raw_responses`` to aggregate usage into a provider-independent +``TurnUsage``. + +Delivery (yield vs auto-send) and tracing are owned by ``UnifiedEmitter``; this +module is purely the provider->canonical adapter. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, AsyncIterator + +from agents.usage import Usage + +from agentex.lib.utils.logging import make_logger +from agentex.lib.core.harness.types import TurnUsage, StreamTaskMessage +from agentex.lib.adk._modules._openai_sync import ( + convert_openai_to_agentex_events, +) + +if TYPE_CHECKING: + from agents import ModelResponse, RunResultStreaming + +logger = make_logger(__name__) + + +def openai_usage_to_turn_usage(usage: Usage | None, model: str | None) -> TurnUsage: + """Map an ``agents.Usage`` to a harness-independent ``TurnUsage``. + + All field access is defensive (``getattr(..., None)``): different model + backends populate different subsets of the usage object, and real zeros are + valid values (e.g. 0 output tokens on a pure cache hit), so we never coerce + a present-but-zero value into ``None``. + """ + if usage is None: + return TurnUsage(model=model) + + input_details = getattr(usage, "input_tokens_details", None) + output_details = getattr(usage, "output_tokens_details", None) + + return TurnUsage( + model=model, + num_llm_calls=getattr(usage, "requests", None) or 0, + input_tokens=getattr(usage, "input_tokens", None), + cached_input_tokens=getattr(input_details, "cached_tokens", None), + output_tokens=getattr(usage, "output_tokens", None), + reasoning_tokens=getattr(output_details, "reasoning_tokens", None), + total_tokens=getattr(usage, "total_tokens", None), + ) + + +def _aggregate_usage(raw_responses: list[ModelResponse]) -> Usage | None: + """Sum the per-response ``Usage`` across a run's ``ModelResponse`` list. + + Returns ``None`` when no response carries usage so the caller can emit a + usage object with only the model name set. ``Usage.add`` accumulates + requests/tokens (including cached/reasoning detail fields). + """ + total: Usage | None = None + for response in raw_responses: + resp_usage = getattr(response, "usage", None) + if resp_usage is None: + continue + if total is None: + total = Usage() + total.add(resp_usage) + return total + + +class OpenAITurn: + """A single OpenAI Agents SDK turn adapted to the ``HarnessTurn`` protocol. + + Construct with exactly one of: + - ``result``: a ``RunResultStreaming`` from ``Runner.run_streamed``. Its + ``stream_events()`` is converted to the canonical stream, and after the + stream is exhausted ``raw_responses`` is read to compute usage. + - ``stream``: a pre-built async iterator of canonical ``StreamTaskMessage`` + events (bypasses ``convert_openai_to_agentex_events``). Useful for tests + and for callers that have already produced canonical events. Usage stays + at ``TurnUsage(model=...)`` because there is no run to read usage from. + + ``coalesce_tool_requests`` is accepted for API parity with other provider + turns but is a no-op for OpenAI: the OpenAI converter already emits a single + ``Full(ToolRequestContent)`` per tool call rather than streamed argument + deltas, so there is nothing to coalesce. + """ + + def __init__( + self, + result: RunResultStreaming | None = None, + model: str | None = None, + stream: AsyncIterator[StreamTaskMessage] | None = None, + coalesce_tool_requests: bool = False, # noqa: ARG002 - API parity, no-op for OpenAI + ) -> None: + if result is None and stream is None: + raise ValueError("OpenAITurn requires either `result` or `stream`") + self._result = result + self._model = model + self._stream = stream + self._usage: TurnUsage = TurnUsage(model=model) + + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: + return self._iter_events() + + async def _iter_events(self) -> AsyncIterator[StreamTaskMessage]: + if self._stream is not None: + async for event in self._stream: + yield event + return + + result = self._result + assert result is not None # guaranteed by __init__ + async for event in convert_openai_to_agentex_events(result.stream_events()): + yield event + + # Stream is exhausted: the run has finished and raw_responses is now + # populated, so usage can be aggregated and normalized. + try: + raw_responses: list[Any] = list(getattr(result, "raw_responses", None) or []) + aggregated = _aggregate_usage(raw_responses) + self._usage = openai_usage_to_turn_usage(aggregated, self._model) + except Exception as exc: # pragma: no cover - defensive: never break delivery on usage + logger.warning(f"Failed to aggregate OpenAI usage: {exc}") + self._usage = TurnUsage(model=self._model) + + def usage(self) -> TurnUsage: + """Normalized turn usage. Valid only after ``events`` is exhausted.""" + return self._usage diff --git a/src/agentex/lib/adk/providers/_modules/openai_turn.py b/src/agentex/lib/adk/providers/_modules/openai_turn.py index 17a6518ee..320642dfc 100644 --- a/src/agentex/lib/adk/providers/_modules/openai_turn.py +++ b/src/agentex/lib/adk/providers/_modules/openai_turn.py @@ -1,134 +1,12 @@ -"""OpenAITurn: adapt an OpenAI Agents SDK streamed run onto the harness surface. +"""Back-compat shim: ``OpenAITurn`` and ``openai_usage_to_turn_usage`` now live +in ``agentex.lib.adk._modules._openai_turn``. -A ``HarnessTurn`` exposes a single canonical ``StreamTaskMessage*`` stream plus -normalized usage. ``OpenAITurn`` wraps a ``RunResultStreaming`` (from -``Runner.run_streamed``), converts its native OpenAI events into the canonical -stream via ``convert_openai_to_agentex_events``, and after exhaustion reads the -run's ``raw_responses`` to aggregate usage into a provider-independent -``TurnUsage``. - -Delivery (yield vs auto-send) and tracing are owned by ``UnifiedEmitter``; this -module is purely the provider->canonical adapter. +Existing importers of +``agentex.lib.adk.providers._modules.openai_turn.{OpenAITurn,openai_usage_to_turn_usage}`` +keep working. """ -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, AsyncIterator - -from agents.usage import Usage - -from agentex.lib.utils.logging import make_logger -from agentex.lib.core.harness.types import TurnUsage, StreamTaskMessage -from agentex.lib.adk.providers._modules.sync_provider import ( - convert_openai_to_agentex_events, +from agentex.lib.adk._modules._openai_turn import ( # noqa: F401 + OpenAITurn, + openai_usage_to_turn_usage, ) - -if TYPE_CHECKING: - from agents import ModelResponse, RunResultStreaming - -logger = make_logger(__name__) - - -def openai_usage_to_turn_usage(usage: Usage | None, model: str | None) -> TurnUsage: - """Map an ``agents.Usage`` to a harness-independent ``TurnUsage``. - - All field access is defensive (``getattr(..., None)``): different model - backends populate different subsets of the usage object, and real zeros are - valid values (e.g. 0 output tokens on a pure cache hit), so we never coerce - a present-but-zero value into ``None``. - """ - if usage is None: - return TurnUsage(model=model) - - input_details = getattr(usage, "input_tokens_details", None) - output_details = getattr(usage, "output_tokens_details", None) - - return TurnUsage( - model=model, - num_llm_calls=getattr(usage, "requests", None) or 0, - input_tokens=getattr(usage, "input_tokens", None), - cached_input_tokens=getattr(input_details, "cached_tokens", None), - output_tokens=getattr(usage, "output_tokens", None), - reasoning_tokens=getattr(output_details, "reasoning_tokens", None), - total_tokens=getattr(usage, "total_tokens", None), - ) - - -def _aggregate_usage(raw_responses: list[ModelResponse]) -> Usage | None: - """Sum the per-response ``Usage`` across a run's ``ModelResponse`` list. - - Returns ``None`` when no response carries usage so the caller can emit a - usage object with only the model name set. ``Usage.add`` accumulates - requests/tokens (including cached/reasoning detail fields). - """ - total: Usage | None = None - for response in raw_responses: - resp_usage = getattr(response, "usage", None) - if resp_usage is None: - continue - if total is None: - total = Usage() - total.add(resp_usage) - return total - - -class OpenAITurn: - """A single OpenAI Agents SDK turn adapted to the ``HarnessTurn`` protocol. - - Construct with exactly one of: - - ``result``: a ``RunResultStreaming`` from ``Runner.run_streamed``. Its - ``stream_events()`` is converted to the canonical stream, and after the - stream is exhausted ``raw_responses`` is read to compute usage. - - ``stream``: a pre-built async iterator of canonical ``StreamTaskMessage`` - events (bypasses ``convert_openai_to_agentex_events``). Useful for tests - and for callers that have already produced canonical events. Usage stays - at ``TurnUsage(model=...)`` because there is no run to read usage from. - - ``coalesce_tool_requests`` is accepted for API parity with other provider - turns but is a no-op for OpenAI: the OpenAI converter already emits a single - ``Full(ToolRequestContent)`` per tool call rather than streamed argument - deltas, so there is nothing to coalesce. - """ - - def __init__( - self, - result: RunResultStreaming | None = None, - model: str | None = None, - stream: AsyncIterator[StreamTaskMessage] | None = None, - coalesce_tool_requests: bool = False, # noqa: ARG002 - API parity, no-op for OpenAI - ) -> None: - if result is None and stream is None: - raise ValueError("OpenAITurn requires either `result` or `stream`") - self._result = result - self._model = model - self._stream = stream - self._usage: TurnUsage = TurnUsage(model=model) - - @property - def events(self) -> AsyncIterator[StreamTaskMessage]: - return self._iter_events() - - async def _iter_events(self) -> AsyncIterator[StreamTaskMessage]: - if self._stream is not None: - async for event in self._stream: - yield event - return - - result = self._result - assert result is not None # guaranteed by __init__ - async for event in convert_openai_to_agentex_events(result.stream_events()): - yield event - - # Stream is exhausted: the run has finished and raw_responses is now - # populated, so usage can be aggregated and normalized. - try: - raw_responses: list[Any] = list(getattr(result, "raw_responses", None) or []) - aggregated = _aggregate_usage(raw_responses) - self._usage = openai_usage_to_turn_usage(aggregated, self._model) - except Exception as exc: # pragma: no cover - defensive: never break delivery on usage - logger.warning(f"Failed to aggregate OpenAI usage: {exc}") - self._usage = TurnUsage(model=self._model) - - def usage(self) -> TurnUsage: - """Normalized turn usage. Valid only after ``events`` is exhausted.""" - return self._usage diff --git a/src/agentex/lib/adk/providers/_modules/sync_provider.py b/src/agentex/lib/adk/providers/_modules/sync_provider.py index d1d5e1c09..86696a2b5 100644 --- a/src/agentex/lib/adk/providers/_modules/sync_provider.py +++ b/src/agentex/lib/adk/providers/_modules/sync_provider.py @@ -14,37 +14,11 @@ TResponseInputItem, AgentOutputSchemaBase, ) -from openai.types.responses import ( - ResponseTextDeltaEvent, - ResponseFunctionToolCall, - ResponseFunctionWebSearch, - ResponseOutputItemDoneEvent, - ResponseOutputItemAddedEvent, - ResponseCodeInterpreterToolCall, - ResponseReasoningSummaryPartAddedEvent, - ResponseReasoningSummaryTextDeltaEvent, -) from agents.models.openai_provider import OpenAIProvider -from openai.types.responses.response_reasoning_text_done_event import ResponseReasoningTextDoneEvent -from openai.types.responses.response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent -from openai.types.responses.response_reasoning_summary_text_done_event import ResponseReasoningSummaryTextDoneEvent from agentex import AsyncAgentex from agentex.lib.utils.logging import make_logger from agentex.lib.core.tracing.tracer import AsyncTracer -from agentex.types.reasoning_content import ReasoningContent -from agentex.types.task_message_delta import TextDelta -from agentex.types.task_message_update import ( - StreamTaskMessageDone, - StreamTaskMessageFull, - StreamTaskMessageDelta, - StreamTaskMessageStart, -) -from agentex.types.task_message_content import TextContent -from agentex.types.tool_request_content import ToolRequestContent -from agentex.types.tool_response_content import ToolResponseContent -from agentex.types.reasoning_content_delta import ReasoningContentDelta -from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta logger = make_logger(__name__) @@ -95,10 +69,10 @@ class SyncStreamingModel(Model): .. deprecated:: Prefer the unified harness surface for new OpenAI Agents integrations: wrap a ``Runner.run_streamed`` result in - ``agentex.lib.adk.providers._modules.openai_turn.OpenAITurn`` and drive + ``agentex.lib.adk._modules._openai_turn.OpenAITurn`` and drive delivery + tracing through ``UnifiedEmitter`` (see the - ``060_harness_openai`` / ``130_harness_openai`` / ``140_harness_openai`` - tutorials). This per-model tracing wrapper predates the harness and is + ``050_openai_agents`` / ``120_openai_agents`` tutorials). This + per-model tracing wrapper predates the harness and is retained only for backwards compatibility; it will be removed in a future release. No runtime warning is emitted. """ @@ -406,329 +380,8 @@ def get_model(self, model_name: Optional[str] = None) -> Model: return wrapped_model -def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, Any]]: - """ - Extract call_id, tool_name, and tool_arguments from a tool call item. - Args: - tool_call_item: The tool call item to process - Returns: - A tuple of (call_id, tool_name, tool_arguments) - """ - # Generic handling for different tool call types - # Try 'call_id' first, then 'id', then generate placeholder - if hasattr(tool_call_item, "call_id"): - call_id = tool_call_item.call_id - elif hasattr(tool_call_item, "id"): - call_id = tool_call_item.id - else: - call_id = f"unknown_call_{id(tool_call_item)}" - - if isinstance(tool_call_item, ResponseFunctionWebSearch): - tool_name = "web_search" - tool_arguments = {"action": tool_call_item.action.model_dump(), "status": tool_call_item.status} - elif isinstance(tool_call_item, ResponseCodeInterpreterToolCall): - tool_name = "code_interpreter" - tool_arguments = {"code": tool_call_item.code, "status": tool_call_item.status} - elif isinstance(tool_call_item, ResponseFunctionToolCall): - # Handle standard function tool calls - tool_name = tool_call_item.name - # Handle the arguments field which might be a string or None - if tool_call_item.arguments: - if isinstance(tool_call_item.arguments, str): - import json - - tool_arguments = json.loads(tool_call_item.arguments) if tool_call_item.arguments else {} - else: - tool_arguments = tool_call_item.arguments - else: - tool_arguments = {} - else: - # Generic handling for any tool call type - tool_name = getattr(tool_call_item, "name", type(tool_call_item).__name__) - # Handle the arguments field which might be a string or None - if hasattr(tool_call_item, "arguments"): - arguments = tool_call_item.arguments - if isinstance(arguments, str): - import json - - tool_arguments = json.loads(arguments) if arguments else {} - elif arguments is None: - tool_arguments = {} - else: - tool_arguments = arguments - else: - tool_arguments = tool_call_item.model_dump() - - return call_id, tool_name, tool_arguments - - -def _extract_tool_response_info(tool_map: dict[str, Any], tool_output_item: Any) -> tuple[str, str, str]: - """ - Extract call_id, tool_name, and content from a tool output item. - Args: - tool_map: Dictionary mapping call_ids to tool names - tool_output_item: The tool output item to process - Returns: - A tuple of (call_id, tool_name, content) - """ - - # Handle different formats of tool_output_item - if isinstance(tool_output_item, dict): - call_id = tool_output_item.get("call_id", tool_output_item.get("id", f"unknown_call_{id(tool_output_item)}")) - content = tool_output_item.get("output", str(tool_output_item)) - else: - # Try to get call_id from attributes - if hasattr(tool_output_item, "call_id"): - call_id = tool_output_item.call_id - elif hasattr(tool_output_item, "id"): - call_id = tool_output_item.id - else: - call_id = f"unknown_call_{id(tool_output_item)}" - - # Get content - if hasattr(tool_output_item, "output"): - content = tool_output_item.output - else: - content = str(tool_output_item) - - # Get tool name from map - tool_name = tool_map.get(call_id, "unknown_tool") - - return call_id, tool_name, content - - -async def convert_openai_to_agentex_events(stream_response): - """Convert OpenAI streaming events to AgentEx TaskMessageUpdate events with reasoning support. - - This is an enhanced version of the base converter that includes support for: - - Reasoning content deltas (for o1 models) - - Reasoning summary deltas (for o1 models) - - Args: - stream_response: An async iterator of OpenAI streaming events - Yields: - TaskMessageUpdate: AgentEx streaming events (StreamTaskMessageDelta, StreamTaskMessageFull, or StreamTaskMessageDone) - """ - - tool_map = {} - event_count = 0 - message_index = 0 # Track message index for proper sequencing - seen_tool_output = False # Track if we've seen tool output to know when final text starts - item_id_to_index = {} # Map item_id to message index - item_id_to_type = {} # Map item_id to content type (text, reasoning_content, reasoning_summary) - - async for event in stream_response: - event_count += 1 - - # Check for raw response events which contain the actual OpenAI streaming events - if hasattr(event, "type") and event.type == "raw_response_event": - if hasattr(event, "data"): - raw_event = event.data - - # Check for ResponseOutputItemAddedEvent which signals a new message starting - if isinstance(raw_event, ResponseOutputItemAddedEvent): - # Don't increment here - we'll increment when we see the actual text delta - # This is just a signal that a new message is starting - pass - - # Handle item completion - send done event to close the message - elif isinstance(raw_event, ResponseOutputItemDoneEvent): - item_id = raw_event.item.id - if item_id in item_id_to_index: - # Get the message type to decide whether to send done event - message_type = item_id_to_type.get(item_id, "text") - - # Don't send done events for reasoning content/summary - # They just end with their last delta - if message_type not in ("reasoning_content", "reasoning_summary"): - yield StreamTaskMessageDone( - type="done", - index=item_id_to_index[item_id], - ) - - # Skip reasoning summary part added events - we handle them on delta - elif isinstance(raw_event, ResponseReasoningSummaryPartAddedEvent): - pass - - # Handle reasoning summary text delta events - elif isinstance(raw_event, ResponseReasoningSummaryTextDeltaEvent): - item_id = raw_event.item_id - summary_index = raw_event.summary_index - - # If this is a new item_id we haven't seen, create a new message - if item_id and item_id not in item_id_to_index: - message_index += 1 - item_id_to_index[item_id] = message_index - item_id_to_type[item_id] = "reasoning_summary" - - # Send a start event for this new reasoning summary message. - # The start content must be ReasoningContent (not TextContent) - # so consumers that branch on the start event's content type - # render a reasoning/thinking indicator; the final persisted - # content is rebuilt from the reasoning deltas regardless. - yield StreamTaskMessageStart( - type="start", - index=item_id_to_index[item_id], - content=ReasoningContent( - type="reasoning", - author="agent", - summary=[], - content=[], - style="active", - ), - ) - - # Use the index for this item_id - current_index = item_id_to_index.get(item_id, message_index) - - # Yield reasoning summary delta - yield StreamTaskMessageDelta( - type="delta", - index=current_index, - delta=ReasoningSummaryDelta( - type="reasoning_summary", - summary_index=summary_index, - summary_delta=raw_event.delta, - ), - ) - - # Handle reasoning summary text done events - elif isinstance(raw_event, ResponseReasoningSummaryTextDoneEvent): - # We do NOT close the streaming context here - # as there can be multiple reasoning summaries. - # The context will be closed when the entire - # output item is done (ResponseOutputItemDoneEvent) - pass - - # Handle reasoning content text delta events - elif isinstance(raw_event, ResponseReasoningTextDeltaEvent): - item_id = raw_event.item_id - content_index = raw_event.content_index - - # If this is a new item_id we haven't seen, create a new message - if item_id and item_id not in item_id_to_index: - message_index += 1 - item_id_to_index[item_id] = message_index - item_id_to_type[item_id] = "reasoning_content" - - # Send a start event for this new reasoning content message. - # The start content must be ReasoningContent (not TextContent) - # so consumers that branch on the start event's content type - # render a reasoning/thinking indicator; the final persisted - # content is rebuilt from the reasoning deltas regardless. - yield StreamTaskMessageStart( - type="start", - index=item_id_to_index[item_id], - content=ReasoningContent( - type="reasoning", - author="agent", - summary=[], - content=[], - style="active", - ), - ) - - # Use the index for this item_id - current_index = item_id_to_index.get(item_id, message_index) - - # Yield reasoning content delta - yield StreamTaskMessageDelta( - type="delta", - index=current_index, - delta=ReasoningContentDelta( - type="reasoning_content", - content_index=content_index, - content_delta=raw_event.delta, - ), - ) - - # Handle reasoning content text done events - elif isinstance(raw_event, ResponseReasoningTextDoneEvent): - # We do NOT close the streaming context here - # as there can be multiple reasoning content texts. - # The context will be closed when the entire - # output item is done (ResponseOutputItemDoneEvent) - pass - - # Check if this is a text delta event from OpenAI - elif isinstance(raw_event, ResponseTextDeltaEvent): - # Check if this event has an item_id - item_id = getattr(raw_event, "item_id", None) - - # If this is a new item_id we haven't seen, it's a new message - if item_id and item_id not in item_id_to_index: - # Check if this is truly a NEW text message after tools - # We need to differentiate between the first text and the final text after tools - if seen_tool_output: - # This is the final text message after tool execution - message_index += 1 - item_id_to_index[item_id] = message_index - else: - item_id_to_index[item_id] = message_index - - item_id_to_type[item_id] = "text" - - # Send a start event with empty content for this new text message - yield StreamTaskMessageStart( - type="start", - index=item_id_to_index[item_id], - content=TextContent( - type="text", - author="agent", - content="", # Start with empty content, deltas will fill it - ), - ) - - # Use the index for this item_id - current_index = item_id_to_index.get(item_id, message_index) - - delta_message = StreamTaskMessageDelta( - type="delta", - index=current_index, - delta=TextDelta( - type="text", - text_delta=raw_event.delta, - ), - ) - yield delta_message - - elif hasattr(event, "type") and event.type == "run_item_stream_event": - # Skip reasoning_item events - they're handled via raw_response_event above - if hasattr(event, "item") and event.item.type == "reasoning_item": - continue - - # Check for tool_call_item type (this is when a tool is being called) - elif hasattr(event, "item") and event.item.type == "tool_call_item": - # Extract tool call information using the helper method - call_id, tool_name, tool_arguments = _extract_tool_call_info(event.item.raw_item) - tool_map[call_id] = tool_name - tool_request_content = ToolRequestContent( - tool_call_id=call_id, - name=tool_name, - arguments=tool_arguments, - author="agent", - ) - message_index += 1 # Increment for new message - yield StreamTaskMessageFull( - index=message_index, - type="full", - content=tool_request_content, - ) - - # Check for tool_call_output_item type (this is when a tool returns output) - elif hasattr(event, "item") and event.item.type == "tool_call_output_item": - # Extract tool response information using the helper method - call_id, tool_name, content = _extract_tool_response_info(tool_map, event.item.raw_item) - tool_response_content = ToolResponseContent( - tool_call_id=call_id, - name=tool_name, - content=content, - author="agent", - ) - message_index += 1 # Increment for new message - seen_tool_output = True # Mark that we've seen tool output so next text gets new index - yield StreamTaskMessageFull( - type="full", - index=message_index, - content=tool_response_content, - ) +# The OpenAI streaming tap ``convert_openai_to_agentex_events`` now lives in +# ``agentex.lib.adk._modules._openai_sync``; re-exported here for back-compat. +from agentex.lib.adk._modules._openai_sync import ( # noqa: E402 + convert_openai_to_agentex_events as convert_openai_to_agentex_events, +) diff --git a/src/agentex/lib/core/services/adk/providers/openai.py b/src/agentex/lib/core/services/adk/providers/openai.py index 1ae29589d..a2513ea01 100644 --- a/src/agentex/lib/core/services/adk/providers/openai.py +++ b/src/agentex/lib/core/services/adk/providers/openai.py @@ -742,11 +742,10 @@ async def run_agent_streamed_auto_send( ) as span: heartbeat_if_in_workflow("run agent streamed auto send") - # AGX1-378 restored: created_at is now threaded through - # UnifiedEmitter.auto_send_turn -> auto_send -> every - # streaming_task_message_context call, so the first agent message of - # the turn is stamped with the workflow-supplied timestamp (e.g. - # workflow.now()) just as the original inline loop did. + # created_at is threaded through UnifiedEmitter.auto_send_turn -> + # auto_send -> every streaming_task_message_context call, so the + # first agent message of the turn is stamped with the + # workflow-supplied timestamp (e.g. workflow.now()). # The dispenser is still used below for guardrail-rejection messages, # which open their own streaming contexts directly. _take_created_at = _make_created_at_dispenser(created_at) diff --git a/tests/lib/adk/providers/test_openai_activities.py b/tests/lib/adk/providers/test_openai_activities.py index 2f89308a9..964b24545 100644 --- a/tests/lib/adk/providers/test_openai_activities.py +++ b/tests/lib/adk/providers/test_openai_activities.py @@ -653,7 +653,7 @@ def _assert_tools_conversion(self, starting_agent, tools_case, _original_tools): @patch("agents.Runner.run_streamed") async def test_run_agent_streamed_auto_send_forwards_created_at(self, mock_runner_run_streamed): - """created_at is forwarded to every streaming context opened by auto_send_turn (AGX1-378).""" + """created_at is forwarded to every streaming context opened by auto_send_turn.""" from datetime import datetime, timezone from agentex.lib.core.temporal.activities.adk.providers.openai_activities import ( diff --git a/tests/lib/adk/providers/test_openai_turn.py b/tests/lib/adk/providers/test_openai_turn.py index 47a9ba9fe..d5ad2b5c8 100644 --- a/tests/lib/adk/providers/test_openai_turn.py +++ b/tests/lib/adk/providers/test_openai_turn.py @@ -25,7 +25,7 @@ def _import_target(): - from agentex.lib.adk.providers._modules.openai_turn import ( + from agentex.lib.adk._modules._openai_turn import ( OpenAITurn, _aggregate_usage, openai_usage_to_turn_usage, @@ -219,7 +219,7 @@ def stream_events(self): # monkeypatch that converter below so this can yield canonical events. return _canonical_stream(canonical) - import agentex.lib.adk.providers._modules.openai_turn as mod + import agentex.lib.adk._modules._openai_turn as mod async def _passthrough(stream): async for e in stream: From 1efd8dc009f52c16a4efd5be76e09f08eb56c711 Mon Sep 17 00:00:00 2001 From: OpenAI Date: Tue, 23 Jun 2026 22:38:36 +0000 Subject: [PATCH 2/2] fix(harness): sort OpenAI sync imports --- src/agentex/lib/adk/_modules/_openai_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agentex/lib/adk/_modules/_openai_sync.py b/src/agentex/lib/adk/_modules/_openai_sync.py index f357b881b..75d8f8f2a 100644 --- a/src/agentex/lib/adk/_modules/_openai_sync.py +++ b/src/agentex/lib/adk/_modules/_openai_sync.py @@ -28,6 +28,7 @@ from openai.types.responses.response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent from openai.types.responses.response_reasoning_summary_text_done_event import ResponseReasoningSummaryTextDoneEvent +from agentex.types.reasoning_content import ReasoningContent from agentex.types.task_message_delta import TextDelta from agentex.types.task_message_update import ( StreamTaskMessageDone, @@ -36,7 +37,6 @@ StreamTaskMessageStart, ) from agentex.types.task_message_content import TextContent -from agentex.types.reasoning_content import ReasoningContent from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent from agentex.types.reasoning_content_delta import ReasoningContentDelta