diff --git a/.stats.yml b/.stats.yml index 5375d17e3..fa83f7dfa 100644 --- a/.stats.yml +++ b/.stats.yml @@ -1,4 +1,4 @@ configured_endpoints: 64 -openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-ae2571b5ac5d337ba5ced527cec0ff6e3088296fa67c3c836ed5a06544b25cb8.yml -openapi_spec_hash: 962a2f20444c7823fd3a34f95365146e +openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-cd43ba4b554ca024dd7ee7b74e4f4700a743282c17def704a0967e6ff251c09b.yml +openapi_spec_hash: 9369ccc9c0289e9d6f641a526d244d1c config_hash: 138b7c0b394e7393133c8ff16a6d0eb3 diff --git a/src/agentex/lib/sdk/utils/webhooks.py b/src/agentex/lib/sdk/utils/webhooks.py new file mode 100644 index 000000000..d4b7b43e1 --- /dev/null +++ b/src/agentex/lib/sdk/utils/webhooks.py @@ -0,0 +1,389 @@ +"""Drive an agent turn from an inbound webhook, inside a forward-route handler. + +The Agentex server already exposes a webhook ingress: a request to +``/agents/forward/name/{agent}/{path}`` is signature-verified (GitHub ``sha256=`` / +Slack ``v0:`` HMAC via the agent's registered keys) and proxied to the agent's own +HTTP route. This helper is what that route handler calls to turn the inbound payload +into an agent turn — without each agent re-implementing payload shaping, config +resolution, session continuity, and reply handling. + +Typical use inside an agent:: + + from fastapi import Request + from agentex.lib.sdk.utils.webhooks import handle_webhook + + + @acp.post("/github-pr") + async def github_pr(request: Request): + body = await request.json() + result = await handle_webhook( + agent_name="my-agent", + payload=body, + acp_type="sync", + shaper="github_pr", + params_source="https:///public/v5/agent_configs//resolve", + params_source_headers={"x-api-key": ..., "x-selected-account-id": ...}, + wait=True, + ) + return {"task_id": result.task_id, "reply": result.reply} + +Config-by-id: pass ``params_source`` pointing at the platform's config-resolve +endpoint; the resolved params (e.g. system_prompt / harness / model / tools) are +forwarded opaquely to ``task/create``. Or pass inline ``params`` for a one-off. +""" + +from __future__ import annotations + +import json +import hashlib +from typing import Any, Literal +from dataclasses import field, dataclass +from collections.abc import Mapping, Callable, Awaitable + +from agentex.lib import adk +from agentex.lib.utils.logging import make_logger +from agentex.types.task_message_content import TextContent + +logger = make_logger(__name__) + +# Injectable params fetcher (url -> JSON). Default uses httpx; tests inject a fake. +ParamsFetcher = Callable[[str], Awaitable[dict[str, Any]]] + +MAX_BODY_CHARS = 4000 +MAX_DIFF_CHARS = 30000 + + +class WebhookError(RuntimeError): + """Raised when a webhook turn cannot be driven (e.g. params resolution failed).""" + + +@dataclass +class WebhookResult: + task_id: str + # Sync agents reply inline. For async agents, ``reply`` is None unless ``wait`` was + # set, in which case it is the polled reply (or None if it didn't settle in time). + reply: str | None = None + task_metadata: dict[str, str] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- shaping + + +def session_key(agent_name: str, channel: str, peer_id: str) -> str: + """Stable per-conversation task name → reused for get-or-create on task/create, so + repeat events from the same source fold into one task instead of spawning new ones.""" + basis = peer_id or "main" + digest = hashlib.sha1(f"{agent_name}:{channel}:{basis}".encode()).hexdigest()[:16] + return f"wh-{channel}-{digest}" + + +# Top-level fields a generic webhook payload might carry its prompt in, in priority +# order. Matched case-insensitively against the payload's keys. +GENERIC_PROMPT_KEYS = ( + "text", + "message", + "prompt", + "goal", + "content", + "body", + "description", + "title", +) + + +def render_generic(body: dict[str, Any]) -> str: + """Generic payload → prompt text: first non-empty string among GENERIC_PROMPT_KEYS + (case-insensitive), else raw JSON.""" + lowered = {key.lower(): value for key, value in body.items() if isinstance(key, str)} + for key in GENERIC_PROMPT_KEYS: + value = lowered.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return json.dumps(body, indent=2)[:8000] + + +def shape_github_pr(body: dict[str, Any]) -> tuple[str, str | None, str]: + """Shape a GitHub/Gitea pull-request webhook into (prompt, peer_id, sender). + + ``peer_id`` is ``repo#number`` so repeated events for the same PR (opened, + synchronize, ...) fold into one task. Falls back to generic rendering for non-PR + payloads (ping, issue, ...). + """ + pull_request = body.get("pull_request") + if not isinstance(pull_request, dict): + return render_generic(body), None, _github_actor(body) + + repo = _repo_full_name(body) + number = pull_request.get("number") + title = (pull_request.get("title") or "").strip() + action = (body.get("action") or "").strip() + description = (pull_request.get("body") or "").strip() + html_url = pull_request.get("html_url") or pull_request.get("url") + + header = "Pull request" + if repo and number is not None: + header = f"Pull request {repo}#{number}" + elif number is not None: + header = f"Pull request #{number}" + + lines = [f"{header}: {title}" if title else header] + if action: + lines.append(f"Action: {action}") + if html_url: + lines.append(f"URL: {html_url}") + if description: + lines.extend(["", "Description:", description[:MAX_BODY_CHARS]]) + + diff = _inline_diff(body, pull_request) + if diff: + lines.extend(["", "Diff:", diff[:MAX_DIFF_CHARS]]) + else: + # Standard GitHub/Gitea payloads carry a diff/patch URL, not the patch body. + # Surface it so a tool-enabled agent (or the caller) can fetch the diff; inline + # `diff` wins. Gitea sends patch_url alongside diff_url, so accept either. + diff_url = pull_request.get("diff_url") or pull_request.get("patch_url") + if diff_url: + lines.extend(["", f"Diff URL: {diff_url}"]) + + peer_id = None + if repo and number is not None: + peer_id = f"{repo}#{number}" + elif number is not None: + peer_id = f"pr#{number}" + return "\n".join(lines), peer_id, _github_actor(body) + + +def _repo_full_name(body: dict[str, Any]) -> str | None: + repo = body.get("repository") + if isinstance(repo, dict) and isinstance(repo.get("full_name"), str): + return repo["full_name"] or None + return None + + +def _github_actor(body: dict[str, Any]) -> str: + sender = body.get("sender") + if isinstance(sender, dict) and isinstance(sender.get("login"), str) and sender["login"]: + return sender["login"] + return "webhook" + + +def _inline_diff(body: dict[str, Any], pull_request: dict[str, Any]) -> str | None: + for source in (body, pull_request): + diff = source.get("diff") + if isinstance(diff, str) and diff.strip(): + return diff.strip() + return None + + +# ------------------------------------------------------------------- params resolution + + +async def _default_fetch(url: str, headers: dict[str, str]) -> dict[str, Any]: + """GET a params source over HTTP. Imported lazily so callers that only pass inline + params carry no httpx dependency.""" + import httpx + + request_headers = {"accept": "application/json", **headers} + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(url, headers=request_headers) + response.raise_for_status() + return response.json() + except httpx.HTTPError as exc: + raise WebhookError(f"params source request failed: {exc}") from exc + except ValueError as exc: # json.JSONDecodeError subclasses ValueError + raise WebhookError(f"params source returned invalid JSON: {exc}") from exc + + +async def resolve_remote_params( + url: str, + headers: dict[str, str] | None = None, + *, + fetch: ParamsFetcher | None = None, +) -> tuple[dict[str, Any], dict[str, str]]: + """Fetch params (+ optional task_metadata) from a config-resolve URL. + + Response shape (lenient):: + + {"params": {...}, "task_metadata": {...}} + + A bare object with no ``params`` key is treated as the params dict itself (minus a + top-level ``task_metadata``, which is returned separately for stamping). + """ + do_fetch = fetch or (lambda u: _default_fetch(u, headers or {})) + payload = await do_fetch(url) + if not isinstance(payload, dict): + raise WebhookError("params source returned a non-object response") + + metadata_raw = payload.get("task_metadata") + task_metadata = {str(k): str(v) for k, v in metadata_raw.items()} if isinstance(metadata_raw, dict) else {} + params = payload.get("params") + if not isinstance(params, dict): + params = {k: v for k, v in payload.items() if k != "task_metadata"} + return params, task_metadata + + +# ------------------------------------------------------------------------- dispatch + + +def _agent_reply_text(messages: object) -> str | None: + """Join agent-authored text from a message list (sync result or polled stream).""" + if not isinstance(messages, list): + return None + parts = [] + for message in messages: + content = getattr(message, "content", None) + if ( + content is not None + and getattr(content, "author", None) == "agent" + and getattr(content, "type", None) == "text" + ): + text = (getattr(content, "content", "") or "").strip() + if text: + parts.append(text) + return "\n\n".join(parts) if parts else None + + +async def handle_webhook( + *, + agent_name: str, + payload: dict[str, Any], + acp_type: Literal["sync", "async"] = "sync", + shaper: Literal["generic", "github_pr"] = "generic", + channel: str | None = None, + params: dict[str, Any] | None = None, + params_source: str | None = None, + params_source_headers: dict[str, str] | None = None, + peer_id: str | None = None, + extra_task_metadata: dict[str, str] | None = None, + wait: bool = False, + fetch: ParamsFetcher | None = None, +) -> WebhookResult: + """Drive an agent turn from a webhook payload, agent-side, via the ADK client. + + - Shapes the payload (generic or GitHub PR) into a prompt + conversation scope. + - Resolves task params: inline ``params``, or fetched from ``params_source`` + (config-by-id). The platform never interprets params — they're forwarded to the + agent as ``task/create`` params. + - Get-or-creates a task keyed on a stable session key, so repeat events fold in. + - Sends the turn (sync → message/send returns the reply inline; async → event/send, + with optional ``wait`` to poll for the reply). + """ + channel = channel or shaper + if shaper == "github_pr": + text, derived_peer, sender = shape_github_pr(payload) + peer_id = peer_id or derived_peer + else: + text, sender = render_generic(payload), "webhook" + + task_metadata: dict[str, str] = {"channel": channel, "sender_id": sender} + if peer_id: + task_metadata["peer_id"] = peer_id + + resolved_params = dict(params) if params else {} + if params_source: + resolved_params, source_metadata = await resolve_remote_params( + params_source, params_source_headers, fetch=fetch + ) + # Source metadata + caller extras never override the canonical fields above. + for key, value in {**source_metadata, **(extra_task_metadata or {})}.items(): + task_metadata.setdefault(key, str(value)) + elif extra_task_metadata: + for key, value in extra_task_metadata.items(): + task_metadata.setdefault(key, str(value)) + + name = session_key(agent_name, channel, peer_id or "") + # task/create carries only name/params (CreateTaskParams has no task_metadata field), + # so we create first, then stamp task_metadata via a follow-up update below. + task = await adk.acp.create_task( + name=name, + agent_name=agent_name, + params=resolved_params or None, + ) + + # Best-effort: stamp the resolved task_metadata (channel/sender/peer_id, plus the + # display_name etc. from params_source) onto the task so it's labeled in the UI. + # Failure must never break the run — the metadata is also returned on the result. + if task_metadata: + try: + merged_task_metadata = { + **_task_metadata_dict(getattr(task, "task_metadata", None)), + **task_metadata, + } + await adk.tasks.update(task_id=task.id, task_metadata=merged_task_metadata) + except Exception: + logger.warning("Failed to stamp task_metadata on task %s", task.id, exc_info=True) + + content = TextContent(author="user", content=text, format="markdown") + + if acp_type == "sync": + messages = await adk.acp.send_message(task_id=task.id, agent_name=agent_name, content=content) + return WebhookResult(task_id=task.id, reply=_agent_reply_text(messages), task_metadata=task_metadata) + + # Async: when we'll wait for the reply, snapshot existing message ids BEFORE the + # event so a reused task's prior reply (session continuity) isn't mistaken for it. + if wait: + seen_ids, seen_count = await _message_snapshot(task.id) + await adk.acp.send_event(task_id=task.id, agent_name=agent_name, content=content) + reply = await _await_reply(task.id, seen_ids, seen_count=seen_count) + else: + await adk.acp.send_event(task_id=task.id, agent_name=agent_name, content=content) + reply = None + return WebhookResult(task_id=task.id, reply=reply, task_metadata=task_metadata) + + +def _task_metadata_dict(value: object) -> dict[str, Any]: + if isinstance(value, Mapping): + return dict(value) + return {} + + +async def _message_snapshot(task_id: str) -> tuple[set[str], int]: + messages = await adk.messages.list(task_id=task_id) + messages = messages or [] + return {mid for m in messages if (mid := getattr(m, "id", None)) is not None}, len(messages) + + +async def _message_ids(task_id: str) -> set[str]: + # Only track real ids. Keeping None in the set would let a later id-less message + # collide with it and be wrongly treated as already-seen (dropping a fresh reply). + seen_ids, _ = await _message_snapshot(task_id) + return seen_ids + + +async def _await_reply( + task_id: str, + seen_ids: set[str | None], + *, + seen_count: int | None = None, + timeout_s: float = 120.0, + interval_s: float = 2.0, + quiescence_s: float = 6.0, +) -> str | None: + """Poll for THIS turn's reply — agent text in messages that weren't present before + the event — until it settles (unchanged for ``quiescence_s``) or times out. Filtering + on new message ids avoids returning a stale prior reply on a reused task.""" + import asyncio + + waited = 0.0 + last: str | None = None + stable_for = 0.0 + while waited < timeout_s: + await asyncio.sleep(interval_s) + waited += interval_s + messages = await adk.messages.list(task_id=task_id) + new = [] + for index, message in enumerate(messages or []): + mid = getattr(message, "id", None) + if mid is not None and mid not in seen_ids: + new.append(message) + elif mid is None and seen_count is not None and index >= seen_count: + new.append(message) + text = _agent_reply_text(new) + if text and text == last: + stable_for += interval_s + if stable_for >= quiescence_s: + return text + elif text: + last, stable_for = text, 0.0 + return last diff --git a/tests/lib/test_webhooks.py b/tests/lib/test_webhooks.py new file mode 100644 index 000000000..e42fac9dd --- /dev/null +++ b/tests/lib/test_webhooks.py @@ -0,0 +1,267 @@ +"""Unit tests for the SDK webhook helper (agentex.lib.sdk.utils.webhooks).""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from agentex.lib import adk +from agentex.lib.sdk.utils.webhooks import ( + WebhookError, + session_key, + handle_webhook, + render_generic, + shape_github_pr, + resolve_remote_params, +) + + +def _pr_payload(**pr_overrides) -> dict: + pr = { + "number": 42, + "title": "Add retry to uploader", + "body": "Adds backoff on 503.", + "html_url": "https://example.com/acme/widgets/pull/42", + } + pr.update(pr_overrides) + return { + "action": "opened", + "repository": {"full_name": "acme/widgets"}, + "sender": {"login": "octocat"}, + "pull_request": pr, + } + + +class TestSessionKey: + def test_stable_and_folds_same_conversation(self): + a = session_key("agent-1", "github_pr", "acme/widgets#42") + b = session_key("agent-1", "github_pr", "acme/widgets#42") + assert a == b and a.startswith("wh-github_pr-") + + def test_differs_by_peer(self): + assert session_key("a", "github_pr", "r#1") != session_key("a", "github_pr", "r#2") + + +class TestShaping: + def test_render_generic_prefers_text_field(self): + assert render_generic({"text": "hello"}) == "hello" + + def test_render_generic_falls_back_to_json(self): + assert "zen" in render_generic({"zen": "be awesome"}) + + def test_render_generic_matches_keys_case_insensitively(self): + assert render_generic({"Message": "hi there"}) == "hi there" + + def test_render_generic_supports_broadened_keys(self): + assert render_generic({"description": "do the thing"}) == "do the thing" + + def test_github_pr_shape(self): + text, peer, sender = shape_github_pr(_pr_payload()) + assert "Pull request acme/widgets#42: Add retry to uploader" in text + assert "Action: opened" in text + assert "Adds backoff on 503." in text + assert peer == "acme/widgets#42" + assert sender == "octocat" + + def test_github_pr_includes_diff(self): + body = _pr_payload() + body["pull_request"]["diff"] = "diff --git a/x b/x\n+line" + text, _, _ = shape_github_pr(body) + assert "Diff:" in text and "+line" in text + + def test_non_pr_payload_falls_back_to_generic(self): + text, peer, _ = shape_github_pr({"zen": "be awesome", "hook_id": 1}) + assert "Pull request" not in text + assert "be awesome" in text + assert peer is None + + +class TestResolveRemoteParams: + async def test_envelope_with_params_and_metadata(self): + async def fetch(_url): + return {"params": {"system_prompt": "x", "model": "m"}, "task_metadata": {"cfg": "1"}} + + params, md = await resolve_remote_params("https://h/resolve", fetch=fetch) + assert params == {"system_prompt": "x", "model": "m"} + assert md == {"cfg": "1"} + + async def test_bare_object_is_params_minus_task_metadata(self): + async def fetch(_url): + return {"system_prompt": "x", "task_metadata": {"cfg": "1"}} + + params, md = await resolve_remote_params("https://h/resolve", fetch=fetch) + assert params == {"system_prompt": "x"} # task_metadata stripped from params + assert md == {"cfg": "1"} + + async def test_non_object_raises(self): + async def fetch(_url): + return ["nope"] + + with pytest.raises(WebhookError): + await resolve_remote_params("https://h/resolve", fetch=fetch) + + +def _agent_msg(text: str): + return SimpleNamespace(content=SimpleNamespace(author="agent", type="text", content=text)) + + +class TestHandleWebhook: + @pytest.fixture(autouse=True) + def _mock_adk(self, monkeypatch): + self.created = {} + self.sent = {} + self.stamped = {} + self.created_task_metadata = {} + + async def create_task(*, name, agent_name, params=None, request=None, **_): + self.created = {"name": name, "agent_name": agent_name, "params": params, "request": request} + return SimpleNamespace(id="task-1", task_metadata=self.created_task_metadata) + + async def send_message(*, task_id, agent_name, content, **_): + self.sent = {"task_id": task_id, "content": content} + return [_agent_msg("Looks good — ship it.")] + + async def update_task(*, task_id, task_metadata=None, **_): + self.stamped = {"task_id": task_id, "task_metadata": task_metadata} + return SimpleNamespace(id=task_id) + + send_event = AsyncMock() + monkeypatch.setattr(adk.acp, "create_task", create_task) + monkeypatch.setattr(adk.acp, "send_message", send_message) + monkeypatch.setattr(adk.acp, "send_event", send_event) + monkeypatch.setattr(adk.tasks, "update", update_task) + self.send_event = send_event + yield + + async def test_sync_github_pr_with_config_by_id(self): + async def fake_resolve(_url): + return {"params": {"system_prompt": "review"}, "task_metadata": {"agent_config_id": "cfg-9"}} + + result = await handle_webhook( + agent_name="golden-agent", + payload=_pr_payload(), + acp_type="sync", + shaper="github_pr", + params_source="https://h/v5/agent_configs/cfg-9/resolve", + fetch=fake_resolve, + ) + + assert result.reply == "Looks good — ship it." + assert self.created["params"] == {"system_prompt": "review"} + # metadata is returned on the result (SDK task/create can't carry it) + md = result.task_metadata + assert md["channel"] == "github_pr" + assert md["peer_id"] == "acme/widgets#42" + assert md["agent_config_id"] == "cfg-9" + # task folded on a stable session key + assert self.created["name"].startswith("wh-github_pr-") + # metadata is also stamped onto the task (best-effort) so it's labeled in the UI + assert self.stamped["task_id"] == "task-1" + assert self.stamped["task_metadata"]["peer_id"] == "acme/widgets#42" + assert self.stamped["task_metadata"]["agent_config_id"] == "cfg-9" + + async def test_inline_params_no_fetch(self): + result = await handle_webhook( + agent_name="a", + payload={"text": "hi"}, + acp_type="sync", + params={"system_prompt": "inline"}, + ) + assert result.reply == "Looks good — ship it." + assert self.created["params"] == {"system_prompt": "inline"} + + async def test_source_metadata_cannot_override_canonical(self): + async def fake_resolve(_url): + return {"params": {}, "task_metadata": {"channel": "spoofed"}} + + result = await handle_webhook( + agent_name="a", + payload=_pr_payload(), + shaper="github_pr", + params_source="https://h/resolve", + fetch=fake_resolve, + ) + assert result.task_metadata["channel"] == "github_pr" + + async def test_task_metadata_preserves_existing_keys_on_reused_task(self): + self.created_task_metadata = { + "labels": ["customer-facing"], + "agent_config_id": "old-cfg", + "channel": "old-channel", + } + + async def fake_resolve(_url): + return {"params": {}, "task_metadata": {"agent_config_id": "cfg-9"}} + + await handle_webhook( + agent_name="a", + payload=_pr_payload(), + shaper="github_pr", + params_source="https://h/resolve", + fetch=fake_resolve, + ) + + stamped_metadata = self.stamped["task_metadata"] + assert stamped_metadata["labels"] == ["customer-facing"] + assert stamped_metadata["agent_config_id"] == "cfg-9" + assert stamped_metadata["channel"] == "github_pr" + + async def test_async_without_wait_sends_event_and_returns_no_reply(self): + result = await handle_webhook(agent_name="a", payload={"text": "go"}, acp_type="async", wait=False) + assert result.reply is None + self.send_event.assert_awaited_once() + + +class TestAwaitReplyIgnoresStalePriorReply: + async def test_returns_only_new_agent_text_on_reused_task(self, monkeypatch): + from agentex.lib.sdk.utils.webhooks import _await_reply + + old = _agent_msg("OLD reply") + old.id = "m1" + new = _agent_msg("NEW reply") + new.id = "m2" + calls = {"n": 0} + + async def fake_list(*, task_id, **_): + calls["n"] += 1 + return [old] if calls["n"] < 2 else [old, new] # new appears on 2nd poll + + async def no_sleep(_seconds): + return None + + monkeypatch.setattr(adk.messages, "list", fake_list) + monkeypatch.setattr("asyncio.sleep", no_sleep) + + # baseline = the pre-existing old message; only m2 (NEW) should be returned + reply = await _await_reply("task-1", {"m1"}, interval_s=0.0, quiescence_s=0.0) + assert reply == "NEW reply" + + async def test_returns_idless_agent_text_after_snapshot(self, monkeypatch): + from agentex.lib.sdk.utils.webhooks import _await_reply + + old = _agent_msg("OLD reply") + old.id = None + new = _agent_msg("NEW reply") + new.id = None + calls = {"n": 0} + + async def fake_list(*, task_id, **_): + calls["n"] += 1 + return [old] if calls["n"] < 2 else [old, new] + + async def no_sleep(_seconds): + return None + + monkeypatch.setattr(adk.messages, "list", fake_list) + monkeypatch.setattr("asyncio.sleep", no_sleep) + + reply = await _await_reply( + "task-1", + set(), + seen_count=1, + interval_s=0.0, + quiescence_s=0.0, + ) + assert reply == "NEW reply"