Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions veadk/runtime/codex/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ async def responses(request: Request) -> Any:
call_kwargs["tools"] = [
t for t in call_kwargs["tools"] if t.get("type") == "function"
]
# On multi-step turns Codex replays prior assistant messages in
# `input` without a `status` field, but Ark's Responses API
# requires `status` on assistant messages (MissingParameter:
# input.status). Backfill it so the tool loop survives a model
# preamble ("let me look...") followed by a tool call.
if isinstance(call_kwargs.get("input"), list):
for item in call_kwargs["input"]:
if (
isinstance(item, dict)
and item.get("type") == "message"
and item.get("role") == "assistant"
and "status" not in item
):
item["status"] = "completed"
call_kwargs.update(
model=f"openai/{model}",
api_base=self.api_base,
Expand Down Expand Up @@ -211,10 +225,11 @@ async def _synth_sse(resp: dict[str, Any]) -> AsyncIterator[bytes]:
litellm's chat->Responses bridge cannot produce a real streamed event
sequence for a chat backend, so we expand the completed response into the
ordered events Codex expects: ``response.created`` -> per output item
(``output_item.added`` -> text/reasoning deltas -> ``output_item.done``) ->
``response.completed``. Only ``message`` and ``reasoning`` items are
emitted (the only kinds the bridged chat backend returns); the completed
response is trimmed to match what was streamed.
(``output_item.added`` -> text/reasoning/tool-call deltas ->
``output_item.done``) -> ``response.completed``. ``message``,
``reasoning`` and ``function_call`` items are emitted; the last is what
drives Codex's agentic loop (a dropped tool call ends the turn at the
preamble). The completed response is trimmed to match what was streamed.
"""
seq = 0

Expand All @@ -227,24 +242,45 @@ def ev(payload: dict[str, Any]) -> bytes:
items = [
it
for it in (resp.get("output") or [])
if it.get("type") in ("message", "reasoning")
if it.get("type") in ("message", "reasoning", "function_call")
]
in_progress = {**resp, "status": "in_progress", "output": []}
yield ev({"type": "response.created", "response": in_progress})
yield ev({"type": "response.in_progress", "response": in_progress})

for idx, item in enumerate(items):
item_id = item.get("id", f"item_{idx}")
item_type = item.get("type")
stub = {**item, "status": "in_progress"}
if item.get("type") == "message":
if item_type == "message":
stub = {**stub, "content": []}
else:
elif item_type == "reasoning":
stub = {**stub, "summary": []}
elif item_type == "function_call":
stub = {**stub, "arguments": ""}
yield ev(
{"type": "response.output_item.added", "output_index": idx, "item": stub}
)

if item.get("type") == "message":
if item_type == "function_call":
# Stream the tool call so Codex executes it and continues the loop.
args = item.get("arguments", "") or ""
base = {"item_id": item_id, "output_index": idx}
yield ev(
{
"type": "response.function_call_arguments.delta",
**base,
"delta": args,
}
)
yield ev(
{
"type": "response.function_call_arguments.done",
**base,
"arguments": args,
}
)
elif item_type == "message":
for cidx, part in enumerate(item.get("content") or []):
text = part.get("text", "")
base = {"item_id": item_id, "output_index": idx, "content_index": cidx}
Expand Down
219 changes: 180 additions & 39 deletions veadk/runtime/codex/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

from __future__ import annotations

import json
from enum import Enum
from typing import TYPE_CHECKING, Any

from google.adk.events.event import Event
Expand Down Expand Up @@ -68,59 +70,198 @@ def build_prompt(ctx: "InvocationContext") -> str:
return "\n".join(lines)


def _reasoning_summary(result: Any) -> str:
"""Join the reasoning-summary text from a result's items, if any.
def _item_dict(item: Any) -> dict[str, Any]:
"""Best-effort plain-dict view of a Codex result item."""
if isinstance(item, dict):
return item
if hasattr(item, "model_dump"):
return item.model_dump()
return {}

A reasoning model (e.g. DeepSeek) sometimes ends a turn with only a
``reasoning`` item and no final ``agentMessage``, leaving
``final_response`` empty. In that case the reasoning summary is the only
thing the model produced, so we surface it rather than print nothing.
"""

def _scalar(value: Any) -> Any:
"""Normalize an enum/pydantic value to a JSON-friendly scalar."""
if isinstance(value, Enum):
return value.value
return getattr(value, "value", value)


def _join(entries: Any) -> str:
"""Join a ``list[str]`` (or list of ``{"text": ...}``) into one string."""
parts: list[str] = []
for item in getattr(result, "items", None) or []:
data = item.model_dump() if hasattr(item, "model_dump") else item
if not isinstance(data, dict) or "reasoning" not in str(data.get("type", "")):
continue
for entry in data.get("summary") or []:
if isinstance(entry, str):
parts.append(entry)
elif isinstance(entry, dict) and entry.get("text"):
parts.append(str(entry["text"]))
for entry in entries or []:
if isinstance(entry, str):
parts.append(entry)
elif isinstance(entry, dict) and entry.get("text"):
parts.append(str(entry["text"]))
return "\n".join(p.strip() for p in parts if p and p.strip())


def _parse_args(raw: Any) -> dict[str, Any]:
"""Coerce a tool-call ``arguments`` value into a dict."""
if isinstance(raw, dict):
return raw
if isinstance(raw, str) and raw.strip():
try:
parsed = json.loads(raw)
return parsed if isinstance(parsed, dict) else {"input": parsed}
except json.JSONDecodeError:
return {"input": raw}
return {}


def _tool_call(
data: dict[str, Any],
) -> tuple[str, dict[str, Any], dict[str, Any]] | None:
"""Map a tool-call thread item to ``(name, args, response)``.

Covers every tool item type the Codex SDK can surface; returns ``None`` for
non-tool items.
"""
itype = data.get("type")
status = _scalar(data.get("status"))
if itype == "commandExecution":
return (
"exec_command",
{"command": data.get("command", ""), "cwd": data.get("cwd")},
{
"output": data.get("aggregated_output", ""),
"exit_code": data.get("exit_code"),
"status": status,
},
)
if itype == "mcpToolCall":
name = ".".join(p for p in (data.get("server"), data.get("tool")) if p)
return (
name or "mcp_tool",
_parse_args(data.get("arguments")),
{
"result": data.get("result"),
"error": data.get("error"),
"status": status,
},
)
if itype == "dynamicToolCall":
name = ".".join(p for p in (data.get("namespace"), data.get("tool")) if p)
return (
name or "tool",
_parse_args(data.get("arguments")),
{
"content": data.get("content_items"),
"success": data.get("success"),
"status": status,
},
)
if itype == "fileChange":
return (
"apply_patch",
{"changes": data.get("changes")},
{"status": status},
)
if itype == "webSearch":
return (
"web_search",
{"query": data.get("query"), "action": data.get("action")},
{"status": "completed"},
)
return None


def _event(author: str, invocation_id: str, role: str, part: types.Part) -> Event:
return Event(
invocation_id=invocation_id,
author=author,
content=types.Content(role=role, parts=[part]),
)


def result_to_events(result: Any, author: str, invocation_id: str) -> list[Event]:
"""Convert a Codex run result into ADK events.
"""Convert a Codex run result into ADK events, faithfully and in order.

The Codex SDK's run result exposes the assistant's final text as
``final_response``. When a turn ends with reasoning but no final message
(``final_response`` empty), fall back to the reasoning summary so the
caller never receives a silently empty turn.
A Codex turn is multi-step. Rather than collapse it to ``final_response``,
walk ``result.items`` and forward each step as its own ADK event, mapping
Codex thread items onto the genai part the matching ADK event expects:

- ``reasoning`` -> a thought text part,
- tool calls (``commandExecution`` / ``mcpToolCall`` / ``dynamicToolCall``
/ ``fileChange`` / ``webSearch``) -> a ``function_call`` part plus a
matching ``function_response`` part carrying the tool's output,
- ``agentMessage`` / ``plan`` / any other text-bearing item -> a text part,
- ``userMessage`` -> skipped (ADK already owns the user turn).

If nothing maps, fall back to ``final_response`` so a turn is never empty.

Args:
result (Any): The object returned by ``thread.run(...)``.
author (str): Event author (the agent name).
invocation_id (str): The ADK invocation id to stamp on the event.
invocation_id (str): The ADK invocation id to stamp on each event.

Returns:
list[google.adk.events.event.Event]: One text event, or empty if the
result carried neither a final message nor reasoning.
list[google.adk.events.event.Event]: The turn's events in order.
"""
text = getattr(result, "final_response", None)
if not text:
summary = _reasoning_summary(result)
if summary:
text = (
"[The model produced only reasoning, no final answer this turn]\n\n"
f"{summary}"
events: list[Event] = []
for item in getattr(result, "items", None) or []:
data = _item_dict(item)
itype = str(data.get("type", ""))

if itype == "userMessage":
continue

if itype == "reasoning":
text = _join(data.get("summary")) or _join(data.get("content"))
if text:
events.append(
_event(
author,
invocation_id,
"model",
types.Part(text=text, thought=True),
)
)
continue

call = _tool_call(data)
if call is not None:
name, args, response = call
call_id = data.get("id") or f"call_{len(events)}"
events.append(
_event(
author,
invocation_id,
"model",
types.Part(
function_call=types.FunctionCall(
id=call_id, name=name, args=args
)
),
)
)
events.append(
_event(
author,
invocation_id,
"user",
types.Part(
function_response=types.FunctionResponse(
id=call_id, name=name, response=response
)
),
)
)
continue

if data.get("text"): # agentMessage, plan, and any text-bearing item
events.append(
_event(
author, invocation_id, "model", types.Part(text=str(data["text"]))
)
)

if events:
return events

# Fallback: never emit nothing.
text = getattr(result, "final_response", None)
if not text:
return []

return [
Event(
invocation_id=invocation_id,
author=author,
content=types.Content(role="model", parts=[types.Part(text=text)]),
)
]
return [_event(author, invocation_id, "model", types.Part(text=text))]
Loading