[integration] Agent workflows (big-agents)#4791
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
feat(sdk): agent runtime behind backend/harness ports
There was a problem hiding this comment.
Actionable comments posted: 10
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 76c33a7d-feff-4e5f-acc0-962498f74cfc
📒 Files selected for processing (70)
sdks/python/agenta/__init__.pysdks/python/agenta/sdk/agents/__init__.pysdks/python/agenta/sdk/agents/adapters/__init__.pysdks/python/agenta/sdk/agents/adapters/_runner_config.pysdks/python/agenta/sdk/agents/adapters/agenta_builtins.pysdks/python/agenta/sdk/agents/adapters/harnesses.pysdks/python/agenta/sdk/agents/adapters/in_process.pysdks/python/agenta/sdk/agents/adapters/local.pysdks/python/agenta/sdk/agents/adapters/sandbox_agent.pysdks/python/agenta/sdk/agents/adapters/vercel/__init__.pysdks/python/agenta/sdk/agents/adapters/vercel/messages.pysdks/python/agenta/sdk/agents/adapters/vercel/routing.pysdks/python/agenta/sdk/agents/adapters/vercel/sse.pysdks/python/agenta/sdk/agents/adapters/vercel/stream.pysdks/python/agenta/sdk/agents/dtos.pysdks/python/agenta/sdk/agents/errors.pysdks/python/agenta/sdk/agents/interfaces.pysdks/python/agenta/sdk/agents/mcp/__init__.pysdks/python/agenta/sdk/agents/mcp/errors.pysdks/python/agenta/sdk/agents/mcp/interfaces.pysdks/python/agenta/sdk/agents/mcp/models.pysdks/python/agenta/sdk/agents/mcp/parsing.pysdks/python/agenta/sdk/agents/mcp/resolver.pysdks/python/agenta/sdk/agents/mcp/wire.pysdks/python/agenta/sdk/agents/streaming.pysdks/python/agenta/sdk/agents/tools/__init__.pysdks/python/agenta/sdk/agents/tools/compat.pysdks/python/agenta/sdk/agents/tools/errors.pysdks/python/agenta/sdk/agents/tools/interfaces.pysdks/python/agenta/sdk/agents/tools/models.pysdks/python/agenta/sdk/agents/tools/parsing.pysdks/python/agenta/sdk/agents/tools/resolver.pysdks/python/agenta/sdk/agents/tools/wire.pysdks/python/agenta/sdk/agents/ui_messages.pysdks/python/agenta/sdk/agents/utils/__init__.pysdks/python/agenta/sdk/agents/utils/ts_runner.pysdks/python/agenta/sdk/agents/utils/wire.pysdks/python/agenta/sdk/decorators/routing.pysdks/python/agenta/sdk/engines/running/interfaces.pysdks/python/agenta/sdk/engines/running/utils.pysdks/python/agenta/sdk/middlewares/running/normalizer.pysdks/python/agenta/sdk/models/workflows.pysdks/python/agenta/sdk/utils/types.pysdks/python/agenta/tests/agents/test_streaming.pysdks/python/oss/tests/pytest/integration/agents/__init__.pysdks/python/oss/tests/pytest/integration/agents/test_transport_roundtrip.pysdks/python/oss/tests/pytest/unit/agents/__init__.pysdks/python/oss/tests/pytest/unit/agents/conftest.pysdks/python/oss/tests/pytest/unit/agents/golden/run_request.claude.jsonsdks/python/oss/tests/pytest/unit/agents/golden/run_request.pi.jsonsdks/python/oss/tests/pytest/unit/agents/golden/run_result.error.jsonsdks/python/oss/tests/pytest/unit/agents/golden/run_result.ok.jsonsdks/python/oss/tests/pytest/unit/agents/mcp/__init__.pysdks/python/oss/tests/pytest/unit/agents/mcp/test_resolver.pysdks/python/oss/tests/pytest/unit/agents/test_dtos_agent_config.pysdks/python/oss/tests/pytest/unit/agents/test_dtos_capabilities_events.pysdks/python/oss/tests/pytest/unit/agents/test_dtos_content_blocks.pysdks/python/oss/tests/pytest/unit/agents/test_dtos_harness_configs.pysdks/python/oss/tests/pytest/unit/agents/test_environment_lifecycle.pysdks/python/oss/tests/pytest/unit/agents/test_harness_adapters.pysdks/python/oss/tests/pytest/unit/agents/test_runner_adapter_config.pysdks/python/oss/tests/pytest/unit/agents/test_ui_messages.pysdks/python/oss/tests/pytest/unit/agents/test_wire_contract.pysdks/python/oss/tests/pytest/unit/agents/tools/__init__.pysdks/python/oss/tests/pytest/unit/agents/tools/test_models.pysdks/python/oss/tests/pytest/unit/agents/tools/test_parsing.pysdks/python/oss/tests/pytest/unit/agents/tools/test_resolver.pysdks/python/oss/tests/pytest/unit/test_normalizer_passthrough.pysdks/python/oss/tests/pytest/utils/test_messages_endpoint.pysdks/python/oss/tests/pytest/utils/test_routing.py
| NOTE on packaging: the Node runner is NOT part of this Python wheel (``pip install agenta`` | ||
| stays pure Python; the wheel contains zero ``.ts``/``.js``). How a standalone Pi user obtains | ||
| the runner -- an ``npx`` npm package, a local checkout, or a Docker sidecar over HTTP -- is an | ||
| open distribution decision; see ``docs/design/agent-workflows/typescript-structure/``. Do NOT | ||
| silently bundle a JS runner into the wheel. |
There was a problem hiding this comment.
Align LocalBackend wording with the stated packaging contract.
Line 9-13 says the wheel must not bundle a JS runner, but Line 30 and the NotImplementedError messages still say “bundled JS”. This contradiction will confuse integrators.
Suggested wording fix
-class LocalBackend(Backend):
- """Run Pi (bundled JS) or Claude (``claude-agent-sdk``) on this machine."""
+class LocalBackend(Backend):
+ """Run Pi (external Node runner) or Claude (``claude-agent-sdk``) on this machine."""
...
raise NotImplementedError(
- "LocalBackend is not implemented yet (Phase 3: Pi via bundled JS, "
+ "LocalBackend is not implemented yet (Phase 3: Pi via external Node runner, "
"Phase 4: Claude via claude-agent-sdk)."
)
...
raise NotImplementedError(
- "LocalBackend is not implemented yet (Phase 3: Pi via bundled JS, "
+ "LocalBackend is not implemented yet (Phase 3: Pi via external Node runner, "
"Phase 4: Claude via claude-agent-sdk)."
)Also applies to: 30-38, 50-53
| def __init__( | ||
| self, | ||
| *, | ||
| sandbox: str = "local", | ||
| url: Optional[str] = None, | ||
| command: Optional[Sequence[str]] = None, | ||
| cwd: Optional[str] = None, | ||
| timeout: float = float(os.getenv("AGENTA_AGENT_RUNNER_TIMEOUT_SECONDS", "180")), | ||
| ) -> None: | ||
| self._sandbox = sandbox | ||
| self._url = url |
There was a problem hiding this comment.
Validate sandbox at construction time.
Line 129 currently accepts any string; invalid values get sent over the wire and fail late. Restrict this to supported values (local, daytona) and raise a configuration error early.
Suggested validation
from ..dtos import (
@@
)
+from ..errors import AgentRunnerConfigurationError
@@
def __init__(
self,
*,
sandbox: str = "local",
@@
timeout: float = float(os.getenv("AGENTA_AGENT_RUNNER_TIMEOUT_SECONDS", "180")),
) -> None:
+ allowed_sandboxes = {"local", "daytona"}
+ if sandbox not in allowed_sandboxes:
+ raise AgentRunnerConfigurationError(
+ f"Unsupported sandbox '{sandbox}'. Expected one of: {sorted(allowed_sandboxes)}."
+ )
self._sandbox = sandbox
self._url = url| def _tool_part_blocks(part: Dict[str, Any], ptype: str) -> List[ContentBlock]: | ||
| """A Vercel tool part -> neutral tool-call/result content blocks.""" | ||
| tool_call_id = part.get("toolCallId") or part.get("tool_call_id") | ||
| tool_name = part.get("toolName") or part.get("tool_name") | ||
| if ( | ||
| tool_name is None | ||
| and ptype.startswith("tool-") | ||
| and ptype != TOOL_OUTPUT_AVAILABLE | ||
| ): | ||
| tool_name = ptype[len("tool-") :] | ||
|
|
||
| blocks: List[ContentBlock] = [] | ||
| if ptype != TOOL_OUTPUT_AVAILABLE or "input" in part: | ||
| blocks.append( | ||
| ContentBlock( | ||
| type="tool_call", | ||
| tool_call_id=tool_call_id, | ||
| tool_name=tool_name, | ||
| input=part.get("input"), | ||
| ) | ||
| ) | ||
|
|
||
| state = part.get("state") | ||
| error_text = part.get("errorText") | ||
| if error_text is not None or state == "output-error": | ||
| blocks.append( | ||
| ContentBlock( | ||
| type="tool_result", | ||
| tool_call_id=tool_call_id, | ||
| tool_name=tool_name, | ||
| output=error_text if error_text is not None else part.get("output"), | ||
| is_error=True, | ||
| ) | ||
| ) | ||
| elif "output" in part or state == "output-available": | ||
| blocks.append( | ||
| ContentBlock( | ||
| type="tool_result", | ||
| tool_call_id=tool_call_id, | ||
| tool_name=tool_name, | ||
| output=part.get("output"), | ||
| is_error=False, | ||
| ) | ||
| ) | ||
| return blocks |
There was a problem hiding this comment.
Output-only tool-* parts are incorrectly turned into synthetic tool_call blocks.
_tool_part_blocks currently emits a tool_call for most tool-* parts even when the part is already in an output state. That fabricates an extra call (often with input=None) and distorts stored history for subsequent turns.
Proposed fix
def _tool_part_blocks(part: Dict[str, Any], ptype: str) -> List[ContentBlock]:
"""A Vercel tool part -> neutral tool-call/result content blocks."""
tool_call_id = part.get("toolCallId") or part.get("tool_call_id")
tool_name = part.get("toolName") or part.get("tool_name")
@@
- blocks: List[ContentBlock] = []
- if ptype != TOOL_OUTPUT_AVAILABLE or "input" in part:
+ state = part.get("state")
+ error_text = part.get("errorText")
+ has_input = "input" in part
+ emits_output = (
+ error_text is not None
+ or state in {"output-available", "output-error"}
+ or "output" in part
+ )
+
+ blocks: List[ContentBlock] = []
+ if has_input or not emits_output:
blocks.append(
ContentBlock(
type="tool_call",
tool_call_id=tool_call_id,
tool_name=tool_name,
input=part.get("input"),
)
)
- state = part.get("state")
- error_text = part.get("errorText")
if error_text is not None or state == "output-error":
blocks.append(
ContentBlock(
type="tool_result",
tool_call_id=tool_call_id,| llm_config = prompt_cfg.get("llm_config") or {} | ||
| model = llm_config.get("model") or defaults.model | ||
| instructions = _system_text(prompt_cfg.get("messages")) or defaults.instructions | ||
| raw_tools = llm_config.get("tools") | ||
| if raw_tools is None: | ||
| raw_tools = prompt_cfg.get("tools") | ||
| else: |
There was a problem hiding this comment.
Guard llm_config type before dictionary access.
Line 694 assumes prompt["llm_config"] is a dict. If it’s a non-dict value, this path crashes with AttributeError instead of applying defaults.
Proposed fix
prompt_cfg = params.get("prompt")
if isinstance(prompt_cfg, dict):
- llm_config = prompt_cfg.get("llm_config") or {}
+ raw_llm_config = prompt_cfg.get("llm_config")
+ llm_config = raw_llm_config if isinstance(raw_llm_config, dict) else {}
model = llm_config.get("model") or defaults.model
instructions = _system_text(prompt_cfg.get("messages")) or defaults.instructions
raw_tools = llm_config.get("tools")
if raw_tools is None:
raw_tools = prompt_cfg.get("tools")| sandbox = await self._sandbox() | ||
| if provisioning: | ||
| await sandbox.add_files(provisioning) | ||
| return await self._backend.create_session( | ||
| sandbox, | ||
| config, | ||
| harness=harness, | ||
| secrets=session_config.secrets, | ||
| trace=session_config.trace, | ||
| session_id=session_config.session_id, | ||
| ) |
There was a problem hiding this comment.
Destroy per-session sandbox on setup/session-creation failure.
If Line 224 (add_files) or Line 225 (create_session) raises, a per-session sandbox is left alive with no owner to tear it down.
Proposed fix
async def create_session(
self,
config: HarnessAgentConfig,
*,
harness: HarnessType,
session_config: SessionConfig,
provisioning: Optional[Mapping[str, bytes]] = None,
) -> Session:
"""Provision a sandbox per policy, then open a session in it."""
sandbox = await self._sandbox()
- if provisioning:
- await sandbox.add_files(provisioning)
- return await self._backend.create_session(
- sandbox,
- config,
- harness=harness,
- secrets=session_config.secrets,
- trace=session_config.trace,
- session_id=session_config.session_id,
- )
+ try:
+ if provisioning:
+ await sandbox.add_files(provisioning)
+ return await self._backend.create_session(
+ sandbox,
+ config,
+ harness=harness,
+ secrets=session_config.secrets,
+ trace=session_config.trace,
+ session_id=session_config.session_id,
+ )
+ except Exception:
+ if self._sandbox_per_session:
+ try:
+ await sandbox.destroy()
+ except Exception:
+ pass
+ raise📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| sandbox = await self._sandbox() | |
| if provisioning: | |
| await sandbox.add_files(provisioning) | |
| return await self._backend.create_session( | |
| sandbox, | |
| config, | |
| harness=harness, | |
| secrets=session_config.secrets, | |
| trace=session_config.trace, | |
| session_id=session_config.session_id, | |
| ) | |
| sandbox = await self._sandbox() | |
| try: | |
| if provisioning: | |
| await sandbox.add_files(provisioning) | |
| return await self._backend.create_session( | |
| sandbox, | |
| config, | |
| harness=harness, | |
| secrets=session_config.secrets, | |
| trace=session_config.trace, | |
| session_id=session_config.session_id, | |
| ) | |
| except Exception: | |
| if self._sandbox_per_session: | |
| try: | |
| await sandbox.destroy() | |
| except Exception: | |
| pass | |
| raise |
| session = await self.create_session(config) | ||
|
|
||
| def _absorb(result: AgentResult) -> None: | ||
| if result.session_id: | ||
| config.session_id = result.session_id | ||
|
|
||
| return session.stream(messages).on_result(_absorb).on_cleanup(session.destroy) |
There was a problem hiding this comment.
Ensure session cleanup if stream setup fails synchronously.
Line 321 only registers cleanup after session.stream(messages) succeeds. If stream construction raises, the session is leaked.
Proposed fix
session = await self.create_session(config)
+ try:
+ run = session.stream(messages)
+ except Exception:
+ await session.destroy()
+ raise
def _absorb(result: AgentResult) -> None:
if result.session_id:
config.session_id = result.session_id
- return session.stream(messages).on_result(_absorb).on_cleanup(session.destroy)
+ return run.on_result(_absorb).on_cleanup(session.destroy)| from agenta.sdk.agents.tools.models import MissingSecretPolicy | ||
|
|
||
| from .errors import MissingMCPSecretError | ||
| from .interfaces import MCPSecretProvider | ||
| from .models import MCPServerConfig, ResolvedMCPServer | ||
|
|
||
|
|
||
| class MCPResolver: | ||
| def __init__( | ||
| self, | ||
| *, | ||
| secret_provider: MCPSecretProvider, | ||
| missing_secret_policy: MissingSecretPolicy = MissingSecretPolicy.ERROR, | ||
| ) -> None: |
There was a problem hiding this comment.
Breaks declared layer direction by importing tools model into MCP.
MCPResolver currently depends on agenta.sdk.agents.tools.models.MissingSecretPolicy, but this cohort declares tools as depending on MCP, not the other way around. This reverse edge can create import-order fragility and circular dependency risk as the stack evolves. Move MissingSecretPolicy to a neutral/shared module (or MCP/shared contract module) and import it from both subsystems.
Possible direction
- from agenta.sdk.agents.tools.models import MissingSecretPolicy
+ from agenta.sdk.agents.shared.missing_secret_policy import MissingSecretPolicy(then define/move the enum in that shared module and update tools imports accordingly)
| out = stdout.decode("utf-8", "replace") | ||
| err = stderr.decode("utf-8", "replace") | ||
| if not out.strip(): | ||
| raise RuntimeError( | ||
| f"Agent runner returned no output. exit={proc.returncode} stderr={err[-2000:]}" | ||
| ) | ||
| try: | ||
| return json.loads(out) | ||
| except json.JSONDecodeError as exc: |
There was a problem hiding this comment.
Treat non-zero subprocess exit as transport failure even with parseable JSON.
Line 74 returns parsed JSON without checking proc.returncode; a crashed runner can look successful if it emitted partial/legacy JSON before exiting non-zero.
Suggested fix
@@ async def deliver_subprocess(...):
out = stdout.decode("utf-8", "replace")
err = stderr.decode("utf-8", "replace")
+ if proc.returncode not in (0, None):
+ raise RuntimeError(
+ "Agent runner exited non-zero. "
+ f"exit={proc.returncode} stderr={err[-2000:]} stdout={out[:500]}"
+ )
if not out.strip():
raise RuntimeError(
f"Agent runner returned no output. exit={proc.returncode} stderr={err[-2000:]}"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| out = stdout.decode("utf-8", "replace") | |
| err = stderr.decode("utf-8", "replace") | |
| if not out.strip(): | |
| raise RuntimeError( | |
| f"Agent runner returned no output. exit={proc.returncode} stderr={err[-2000:]}" | |
| ) | |
| try: | |
| return json.loads(out) | |
| except json.JSONDecodeError as exc: | |
| out = stdout.decode("utf-8", "replace") | |
| err = stderr.decode("utf-8", "replace") | |
| if proc.returncode not in (0, None): | |
| raise RuntimeError( | |
| "Agent runner exited non-zero. " | |
| f"exit={proc.returncode} stderr={err[-2000:]} stdout={out[:500]}" | |
| ) | |
| if not out.strip(): | |
| raise RuntimeError( | |
| f"Agent runner returned no output. exit={proc.returncode} stderr={err[-2000:]}" | |
| ) | |
| try: | |
| return json.loads(out) | |
| except json.JSONDecodeError as exc: |
| async for line in response.aiter_lines(): | ||
| line = line.strip() | ||
| if line: | ||
| record = json.loads(line) | ||
| if record.get("kind") == "result": | ||
| saw_result = True | ||
| yield record | ||
| if not saw_result: | ||
| raise RuntimeError("Agent runner stream ended without a terminal result record") |
There was a problem hiding this comment.
Enforce a single terminal result record in stream transports.
Line 120 and Line 168 set saw_result=True but still allow additional records, which violates the “exactly one terminal result” stream contract and can cause ambiguous terminal state handling.
Suggested fix
@@ async def deliver_http_stream(...):
- async for line in response.aiter_lines():
+ async for line in response.aiter_lines():
line = line.strip()
if line:
record = json.loads(line)
+ if saw_result:
+ raise RuntimeError(
+ "Agent runner emitted records after terminal result record"
+ )
if record.get("kind") == "result":
- saw_result = True
+ if saw_result:
+ raise RuntimeError(
+ "Agent runner emitted multiple terminal result records"
+ )
+ saw_result = True
yield record
@@ async def deliver_subprocess_stream(...):
line = raw.decode("utf-8", "replace").strip()
if line:
record = json.loads(line)
+ if saw_result:
+ raise RuntimeError(
+ "Agent runner emitted records after terminal result record"
+ )
if record.get("kind") == "result":
- saw_result = True
+ if saw_result:
+ raise RuntimeError(
+ "Agent runner emitted multiple terminal result records"
+ )
+ saw_result = True
yield recordAlso applies to: 167-175
| # agenta:builtin:* — application-only (not evaluators) | ||
| ("builtin", "chat"): (True, False, False), | ||
| ("builtin", "completion"): (True, False, False), | ||
| ("builtin", "agent"): (True, False, False), |
There was a problem hiding this comment.
is_agent is never inferred, so agent workflows keep WorkflowFlags.is_agent=False.
You added the built-in agent role mapping, but infer_flags_from_data still never computes/passes is_agent into WorkflowFlags, so the new agent flag/filter path won’t work as intended.
💡 Proposed fix
@@
- is_chat = key == "chat" or _has_messages_input(inputs_schema)
+ is_chat = key == "chat" or _has_messages_input(inputs_schema)
+ is_agent = key == "agent"
@@
return WorkflowFlags(
@@
# schema-derived
is_chat=is_chat,
+ is_agent=is_agent,
# interface-derived
has_url=has_url,
Context
big-agentsis the integration branch for the agent-workflows feature. Every agent PR targetsbig-agents(directly, or by stacking on one that does). The plan is to review and merge each sub-PR intobig-agents, then mergebig-agentsintomainas a single unit.This PR is a draft tracker. It stays open until all the sub-PRs below are merged into
big-agents. The branch starts from an empty commit, so the diff fills in as sub-PRs land.Integrated PRs
Each box gets checked when that PR is merged into
big-agents. Indented items stack on the item above them.SDK and service
Runner
Frontend
Hosting
Sandbox-agent deployment
Docs
Branch-only (no PR yet)
These design-doc branches are stacked on
big-agentsbut have no PR. Open one if you want them reviewed separately, otherwise they fold in with the docs.docs/agent-model-config-and-provider-authdocs/agent-skills-configdocs/agent-code-tool-sandboxdocs/agent-harness-capabilitiesNotes
rivet → sandbox-agentrename folded in. The in-place apply of test(agent): vitest suite + CI for the agent runner; fix relay error bug #4784 conflicts with the rename, so it gets rebuilt during review.