diff --git a/veadk/runtime/codex/proxy.py b/veadk/runtime/codex/proxy.py index 6d680f73..a190fbd2 100644 --- a/veadk/runtime/codex/proxy.py +++ b/veadk/runtime/codex/proxy.py @@ -103,6 +103,20 @@ async def responses(request: Request) -> Any: call_kwargs["tools"] = [ t for t in call_kwargs["tools"] if t.get("type") == "function" ] + # On multi-step turns Codex replays prior assistant messages in + # `input` without a `status` field, but Ark's Responses API + # requires `status` on assistant messages (MissingParameter: + # input.status). Backfill it so the tool loop survives a model + # preamble ("let me look...") followed by a tool call. + if isinstance(call_kwargs.get("input"), list): + for item in call_kwargs["input"]: + if ( + isinstance(item, dict) + and item.get("type") == "message" + and item.get("role") == "assistant" + and "status" not in item + ): + item["status"] = "completed" call_kwargs.update( model=f"openai/{model}", api_base=self.api_base, @@ -211,10 +225,11 @@ async def _synth_sse(resp: dict[str, Any]) -> AsyncIterator[bytes]: litellm's chat->Responses bridge cannot produce a real streamed event sequence for a chat backend, so we expand the completed response into the ordered events Codex expects: ``response.created`` -> per output item - (``output_item.added`` -> text/reasoning deltas -> ``output_item.done``) -> - ``response.completed``. Only ``message`` and ``reasoning`` items are - emitted (the only kinds the bridged chat backend returns); the completed - response is trimmed to match what was streamed. + (``output_item.added`` -> text/reasoning/tool-call deltas -> + ``output_item.done``) -> ``response.completed``. ``message``, + ``reasoning`` and ``function_call`` items are emitted; the last is what + drives Codex's agentic loop (a dropped tool call ends the turn at the + preamble). The completed response is trimmed to match what was streamed. """ seq = 0 @@ -227,7 +242,7 @@ def ev(payload: dict[str, Any]) -> bytes: items = [ it for it in (resp.get("output") or []) - if it.get("type") in ("message", "reasoning") + if it.get("type") in ("message", "reasoning", "function_call") ] in_progress = {**resp, "status": "in_progress", "output": []} yield ev({"type": "response.created", "response": in_progress}) @@ -235,16 +250,37 @@ def ev(payload: dict[str, Any]) -> bytes: for idx, item in enumerate(items): item_id = item.get("id", f"item_{idx}") + item_type = item.get("type") stub = {**item, "status": "in_progress"} - if item.get("type") == "message": + if item_type == "message": stub = {**stub, "content": []} - else: + elif item_type == "reasoning": stub = {**stub, "summary": []} + elif item_type == "function_call": + stub = {**stub, "arguments": ""} yield ev( {"type": "response.output_item.added", "output_index": idx, "item": stub} ) - if item.get("type") == "message": + if item_type == "function_call": + # Stream the tool call so Codex executes it and continues the loop. + args = item.get("arguments", "") or "" + base = {"item_id": item_id, "output_index": idx} + yield ev( + { + "type": "response.function_call_arguments.delta", + **base, + "delta": args, + } + ) + yield ev( + { + "type": "response.function_call_arguments.done", + **base, + "arguments": args, + } + ) + elif item_type == "message": for cidx, part in enumerate(item.get("content") or []): text = part.get("text", "") base = {"item_id": item_id, "output_index": idx, "content_index": cidx} diff --git a/veadk/runtime/codex/translate.py b/veadk/runtime/codex/translate.py index 5e699941..a98e7ac4 100644 --- a/veadk/runtime/codex/translate.py +++ b/veadk/runtime/codex/translate.py @@ -23,6 +23,8 @@ from __future__ import annotations +import json +from enum import Enum from typing import TYPE_CHECKING, Any from google.adk.events.event import Event @@ -68,59 +70,198 @@ def build_prompt(ctx: "InvocationContext") -> str: return "\n".join(lines) -def _reasoning_summary(result: Any) -> str: - """Join the reasoning-summary text from a result's items, if any. +def _item_dict(item: Any) -> dict[str, Any]: + """Best-effort plain-dict view of a Codex result item.""" + if isinstance(item, dict): + return item + if hasattr(item, "model_dump"): + return item.model_dump() + return {} - A reasoning model (e.g. DeepSeek) sometimes ends a turn with only a - ``reasoning`` item and no final ``agentMessage``, leaving - ``final_response`` empty. In that case the reasoning summary is the only - thing the model produced, so we surface it rather than print nothing. - """ + +def _scalar(value: Any) -> Any: + """Normalize an enum/pydantic value to a JSON-friendly scalar.""" + if isinstance(value, Enum): + return value.value + return getattr(value, "value", value) + + +def _join(entries: Any) -> str: + """Join a ``list[str]`` (or list of ``{"text": ...}``) into one string.""" parts: list[str] = [] - for item in getattr(result, "items", None) or []: - data = item.model_dump() if hasattr(item, "model_dump") else item - if not isinstance(data, dict) or "reasoning" not in str(data.get("type", "")): - continue - for entry in data.get("summary") or []: - if isinstance(entry, str): - parts.append(entry) - elif isinstance(entry, dict) and entry.get("text"): - parts.append(str(entry["text"])) + for entry in entries or []: + if isinstance(entry, str): + parts.append(entry) + elif isinstance(entry, dict) and entry.get("text"): + parts.append(str(entry["text"])) return "\n".join(p.strip() for p in parts if p and p.strip()) +def _parse_args(raw: Any) -> dict[str, Any]: + """Coerce a tool-call ``arguments`` value into a dict.""" + if isinstance(raw, dict): + return raw + if isinstance(raw, str) and raw.strip(): + try: + parsed = json.loads(raw) + return parsed if isinstance(parsed, dict) else {"input": parsed} + except json.JSONDecodeError: + return {"input": raw} + return {} + + +def _tool_call( + data: dict[str, Any], +) -> tuple[str, dict[str, Any], dict[str, Any]] | None: + """Map a tool-call thread item to ``(name, args, response)``. + + Covers every tool item type the Codex SDK can surface; returns ``None`` for + non-tool items. + """ + itype = data.get("type") + status = _scalar(data.get("status")) + if itype == "commandExecution": + return ( + "exec_command", + {"command": data.get("command", ""), "cwd": data.get("cwd")}, + { + "output": data.get("aggregated_output", ""), + "exit_code": data.get("exit_code"), + "status": status, + }, + ) + if itype == "mcpToolCall": + name = ".".join(p for p in (data.get("server"), data.get("tool")) if p) + return ( + name or "mcp_tool", + _parse_args(data.get("arguments")), + { + "result": data.get("result"), + "error": data.get("error"), + "status": status, + }, + ) + if itype == "dynamicToolCall": + name = ".".join(p for p in (data.get("namespace"), data.get("tool")) if p) + return ( + name or "tool", + _parse_args(data.get("arguments")), + { + "content": data.get("content_items"), + "success": data.get("success"), + "status": status, + }, + ) + if itype == "fileChange": + return ( + "apply_patch", + {"changes": data.get("changes")}, + {"status": status}, + ) + if itype == "webSearch": + return ( + "web_search", + {"query": data.get("query"), "action": data.get("action")}, + {"status": "completed"}, + ) + return None + + +def _event(author: str, invocation_id: str, role: str, part: types.Part) -> Event: + return Event( + invocation_id=invocation_id, + author=author, + content=types.Content(role=role, parts=[part]), + ) + + def result_to_events(result: Any, author: str, invocation_id: str) -> list[Event]: - """Convert a Codex run result into ADK events. + """Convert a Codex run result into ADK events, faithfully and in order. - The Codex SDK's run result exposes the assistant's final text as - ``final_response``. When a turn ends with reasoning but no final message - (``final_response`` empty), fall back to the reasoning summary so the - caller never receives a silently empty turn. + A Codex turn is multi-step. Rather than collapse it to ``final_response``, + walk ``result.items`` and forward each step as its own ADK event, mapping + Codex thread items onto the genai part the matching ADK event expects: + + - ``reasoning`` -> a thought text part, + - tool calls (``commandExecution`` / ``mcpToolCall`` / ``dynamicToolCall`` + / ``fileChange`` / ``webSearch``) -> a ``function_call`` part plus a + matching ``function_response`` part carrying the tool's output, + - ``agentMessage`` / ``plan`` / any other text-bearing item -> a text part, + - ``userMessage`` -> skipped (ADK already owns the user turn). + + If nothing maps, fall back to ``final_response`` so a turn is never empty. Args: result (Any): The object returned by ``thread.run(...)``. author (str): Event author (the agent name). - invocation_id (str): The ADK invocation id to stamp on the event. + invocation_id (str): The ADK invocation id to stamp on each event. Returns: - list[google.adk.events.event.Event]: One text event, or empty if the - result carried neither a final message nor reasoning. + list[google.adk.events.event.Event]: The turn's events in order. """ - text = getattr(result, "final_response", None) - if not text: - summary = _reasoning_summary(result) - if summary: - text = ( - "[The model produced only reasoning, no final answer this turn]\n\n" - f"{summary}" + events: list[Event] = [] + for item in getattr(result, "items", None) or []: + data = _item_dict(item) + itype = str(data.get("type", "")) + + if itype == "userMessage": + continue + + if itype == "reasoning": + text = _join(data.get("summary")) or _join(data.get("content")) + if text: + events.append( + _event( + author, + invocation_id, + "model", + types.Part(text=text, thought=True), + ) + ) + continue + + call = _tool_call(data) + if call is not None: + name, args, response = call + call_id = data.get("id") or f"call_{len(events)}" + events.append( + _event( + author, + invocation_id, + "model", + types.Part( + function_call=types.FunctionCall( + id=call_id, name=name, args=args + ) + ), + ) ) + events.append( + _event( + author, + invocation_id, + "user", + types.Part( + function_response=types.FunctionResponse( + id=call_id, name=name, response=response + ) + ), + ) + ) + continue + + if data.get("text"): # agentMessage, plan, and any text-bearing item + events.append( + _event( + author, invocation_id, "model", types.Part(text=str(data["text"])) + ) + ) + + if events: + return events + + # Fallback: never emit nothing. + text = getattr(result, "final_response", None) if not text: return [] - - return [ - Event( - invocation_id=invocation_id, - author=author, - content=types.Content(role="model", parts=[types.Part(text=text)]), - ) - ] + return [_event(author, invocation_id, "model", types.Part(text=text))]