Skip to content

Commit 514075d

Browse files
danielmillerpstainless-app[bot]claude
authored
feat(sdk): add webhook helper for forward-route handlers (#419)
Co-authored-by: stainless-app[bot] <142633134+stainless-app[bot]@users.noreply.github.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 7f6d70a commit 514075d

3 files changed

Lines changed: 658 additions & 2 deletions

File tree

.stats.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
configured_endpoints: 64
2-
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-ae2571b5ac5d337ba5ced527cec0ff6e3088296fa67c3c836ed5a06544b25cb8.yml
3-
openapi_spec_hash: 962a2f20444c7823fd3a34f95365146e
2+
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-cd43ba4b554ca024dd7ee7b74e4f4700a743282c17def704a0967e6ff251c09b.yml
3+
openapi_spec_hash: 9369ccc9c0289e9d6f641a526d244d1c
44
config_hash: 138b7c0b394e7393133c8ff16a6d0eb3
Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
"""Drive an agent turn from an inbound webhook, inside a forward-route handler.
2+
3+
The Agentex server already exposes a webhook ingress: a request to
4+
``/agents/forward/name/{agent}/{path}`` is signature-verified (GitHub ``sha256=`` /
5+
Slack ``v0:`` HMAC via the agent's registered keys) and proxied to the agent's own
6+
HTTP route. This helper is what that route handler calls to turn the inbound payload
7+
into an agent turn — without each agent re-implementing payload shaping, config
8+
resolution, session continuity, and reply handling.
9+
10+
Typical use inside an agent::
11+
12+
from fastapi import Request
13+
from agentex.lib.sdk.utils.webhooks import handle_webhook
14+
15+
16+
@acp.post("/github-pr")
17+
async def github_pr(request: Request):
18+
body = await request.json()
19+
result = await handle_webhook(
20+
agent_name="my-agent",
21+
payload=body,
22+
acp_type="sync",
23+
shaper="github_pr",
24+
params_source="https://<host>/public/v5/agent_configs/<id>/resolve",
25+
params_source_headers={"x-api-key": ..., "x-selected-account-id": ...},
26+
wait=True,
27+
)
28+
return {"task_id": result.task_id, "reply": result.reply}
29+
30+
Config-by-id: pass ``params_source`` pointing at the platform's config-resolve
31+
endpoint; the resolved params (e.g. system_prompt / harness / model / tools) are
32+
forwarded opaquely to ``task/create``. Or pass inline ``params`` for a one-off.
33+
"""
34+
35+
from __future__ import annotations
36+
37+
import json
38+
import hashlib
39+
from typing import Any, Literal
40+
from dataclasses import field, dataclass
41+
from collections.abc import Mapping, Callable, Awaitable
42+
43+
from agentex.lib import adk
44+
from agentex.lib.utils.logging import make_logger
45+
from agentex.types.task_message_content import TextContent
46+
47+
logger = make_logger(__name__)
48+
49+
# Injectable params fetcher (url -> JSON). Default uses httpx; tests inject a fake.
50+
ParamsFetcher = Callable[[str], Awaitable[dict[str, Any]]]
51+
52+
MAX_BODY_CHARS = 4000
53+
MAX_DIFF_CHARS = 30000
54+
55+
56+
class WebhookError(RuntimeError):
57+
"""Raised when a webhook turn cannot be driven (e.g. params resolution failed)."""
58+
59+
60+
@dataclass
61+
class WebhookResult:
62+
task_id: str
63+
# Sync agents reply inline. For async agents, ``reply`` is None unless ``wait`` was
64+
# set, in which case it is the polled reply (or None if it didn't settle in time).
65+
reply: str | None = None
66+
task_metadata: dict[str, str] = field(default_factory=dict)
67+
68+
69+
# --------------------------------------------------------------------------- shaping
70+
71+
72+
def session_key(agent_name: str, channel: str, peer_id: str) -> str:
73+
"""Stable per-conversation task name → reused for get-or-create on task/create, so
74+
repeat events from the same source fold into one task instead of spawning new ones."""
75+
basis = peer_id or "main"
76+
digest = hashlib.sha1(f"{agent_name}:{channel}:{basis}".encode()).hexdigest()[:16]
77+
return f"wh-{channel}-{digest}"
78+
79+
80+
# Top-level fields a generic webhook payload might carry its prompt in, in priority
81+
# order. Matched case-insensitively against the payload's keys.
82+
GENERIC_PROMPT_KEYS = (
83+
"text",
84+
"message",
85+
"prompt",
86+
"goal",
87+
"content",
88+
"body",
89+
"description",
90+
"title",
91+
)
92+
93+
94+
def render_generic(body: dict[str, Any]) -> str:
95+
"""Generic payload → prompt text: first non-empty string among GENERIC_PROMPT_KEYS
96+
(case-insensitive), else raw JSON."""
97+
lowered = {key.lower(): value for key, value in body.items() if isinstance(key, str)}
98+
for key in GENERIC_PROMPT_KEYS:
99+
value = lowered.get(key)
100+
if isinstance(value, str) and value.strip():
101+
return value.strip()
102+
return json.dumps(body, indent=2)[:8000]
103+
104+
105+
def shape_github_pr(body: dict[str, Any]) -> tuple[str, str | None, str]:
106+
"""Shape a GitHub/Gitea pull-request webhook into (prompt, peer_id, sender).
107+
108+
``peer_id`` is ``repo#number`` so repeated events for the same PR (opened,
109+
synchronize, ...) fold into one task. Falls back to generic rendering for non-PR
110+
payloads (ping, issue, ...).
111+
"""
112+
pull_request = body.get("pull_request")
113+
if not isinstance(pull_request, dict):
114+
return render_generic(body), None, _github_actor(body)
115+
116+
repo = _repo_full_name(body)
117+
number = pull_request.get("number")
118+
title = (pull_request.get("title") or "").strip()
119+
action = (body.get("action") or "").strip()
120+
description = (pull_request.get("body") or "").strip()
121+
html_url = pull_request.get("html_url") or pull_request.get("url")
122+
123+
header = "Pull request"
124+
if repo and number is not None:
125+
header = f"Pull request {repo}#{number}"
126+
elif number is not None:
127+
header = f"Pull request #{number}"
128+
129+
lines = [f"{header}: {title}" if title else header]
130+
if action:
131+
lines.append(f"Action: {action}")
132+
if html_url:
133+
lines.append(f"URL: {html_url}")
134+
if description:
135+
lines.extend(["", "Description:", description[:MAX_BODY_CHARS]])
136+
137+
diff = _inline_diff(body, pull_request)
138+
if diff:
139+
lines.extend(["", "Diff:", diff[:MAX_DIFF_CHARS]])
140+
else:
141+
# Standard GitHub/Gitea payloads carry a diff/patch URL, not the patch body.
142+
# Surface it so a tool-enabled agent (or the caller) can fetch the diff; inline
143+
# `diff` wins. Gitea sends patch_url alongside diff_url, so accept either.
144+
diff_url = pull_request.get("diff_url") or pull_request.get("patch_url")
145+
if diff_url:
146+
lines.extend(["", f"Diff URL: {diff_url}"])
147+
148+
peer_id = None
149+
if repo and number is not None:
150+
peer_id = f"{repo}#{number}"
151+
elif number is not None:
152+
peer_id = f"pr#{number}"
153+
return "\n".join(lines), peer_id, _github_actor(body)
154+
155+
156+
def _repo_full_name(body: dict[str, Any]) -> str | None:
157+
repo = body.get("repository")
158+
if isinstance(repo, dict) and isinstance(repo.get("full_name"), str):
159+
return repo["full_name"] or None
160+
return None
161+
162+
163+
def _github_actor(body: dict[str, Any]) -> str:
164+
sender = body.get("sender")
165+
if isinstance(sender, dict) and isinstance(sender.get("login"), str) and sender["login"]:
166+
return sender["login"]
167+
return "webhook"
168+
169+
170+
def _inline_diff(body: dict[str, Any], pull_request: dict[str, Any]) -> str | None:
171+
for source in (body, pull_request):
172+
diff = source.get("diff")
173+
if isinstance(diff, str) and diff.strip():
174+
return diff.strip()
175+
return None
176+
177+
178+
# ------------------------------------------------------------------- params resolution
179+
180+
181+
async def _default_fetch(url: str, headers: dict[str, str]) -> dict[str, Any]:
182+
"""GET a params source over HTTP. Imported lazily so callers that only pass inline
183+
params carry no httpx dependency."""
184+
import httpx
185+
186+
request_headers = {"accept": "application/json", **headers}
187+
try:
188+
async with httpx.AsyncClient(timeout=30.0) as client:
189+
response = await client.get(url, headers=request_headers)
190+
response.raise_for_status()
191+
return response.json()
192+
except httpx.HTTPError as exc:
193+
raise WebhookError(f"params source request failed: {exc}") from exc
194+
except ValueError as exc: # json.JSONDecodeError subclasses ValueError
195+
raise WebhookError(f"params source returned invalid JSON: {exc}") from exc
196+
197+
198+
async def resolve_remote_params(
199+
url: str,
200+
headers: dict[str, str] | None = None,
201+
*,
202+
fetch: ParamsFetcher | None = None,
203+
) -> tuple[dict[str, Any], dict[str, str]]:
204+
"""Fetch params (+ optional task_metadata) from a config-resolve URL.
205+
206+
Response shape (lenient)::
207+
208+
{"params": {...}, "task_metadata": {...}}
209+
210+
A bare object with no ``params`` key is treated as the params dict itself (minus a
211+
top-level ``task_metadata``, which is returned separately for stamping).
212+
"""
213+
do_fetch = fetch or (lambda u: _default_fetch(u, headers or {}))
214+
payload = await do_fetch(url)
215+
if not isinstance(payload, dict):
216+
raise WebhookError("params source returned a non-object response")
217+
218+
metadata_raw = payload.get("task_metadata")
219+
task_metadata = {str(k): str(v) for k, v in metadata_raw.items()} if isinstance(metadata_raw, dict) else {}
220+
params = payload.get("params")
221+
if not isinstance(params, dict):
222+
params = {k: v for k, v in payload.items() if k != "task_metadata"}
223+
return params, task_metadata
224+
225+
226+
# ------------------------------------------------------------------------- dispatch
227+
228+
229+
def _agent_reply_text(messages: object) -> str | None:
230+
"""Join agent-authored text from a message list (sync result or polled stream)."""
231+
if not isinstance(messages, list):
232+
return None
233+
parts = []
234+
for message in messages:
235+
content = getattr(message, "content", None)
236+
if (
237+
content is not None
238+
and getattr(content, "author", None) == "agent"
239+
and getattr(content, "type", None) == "text"
240+
):
241+
text = (getattr(content, "content", "") or "").strip()
242+
if text:
243+
parts.append(text)
244+
return "\n\n".join(parts) if parts else None
245+
246+
247+
async def handle_webhook(
248+
*,
249+
agent_name: str,
250+
payload: dict[str, Any],
251+
acp_type: Literal["sync", "async"] = "sync",
252+
shaper: Literal["generic", "github_pr"] = "generic",
253+
channel: str | None = None,
254+
params: dict[str, Any] | None = None,
255+
params_source: str | None = None,
256+
params_source_headers: dict[str, str] | None = None,
257+
peer_id: str | None = None,
258+
extra_task_metadata: dict[str, str] | None = None,
259+
wait: bool = False,
260+
fetch: ParamsFetcher | None = None,
261+
) -> WebhookResult:
262+
"""Drive an agent turn from a webhook payload, agent-side, via the ADK client.
263+
264+
- Shapes the payload (generic or GitHub PR) into a prompt + conversation scope.
265+
- Resolves task params: inline ``params``, or fetched from ``params_source``
266+
(config-by-id). The platform never interprets params — they're forwarded to the
267+
agent as ``task/create`` params.
268+
- Get-or-creates a task keyed on a stable session key, so repeat events fold in.
269+
- Sends the turn (sync → message/send returns the reply inline; async → event/send,
270+
with optional ``wait`` to poll for the reply).
271+
"""
272+
channel = channel or shaper
273+
if shaper == "github_pr":
274+
text, derived_peer, sender = shape_github_pr(payload)
275+
peer_id = peer_id or derived_peer
276+
else:
277+
text, sender = render_generic(payload), "webhook"
278+
279+
task_metadata: dict[str, str] = {"channel": channel, "sender_id": sender}
280+
if peer_id:
281+
task_metadata["peer_id"] = peer_id
282+
283+
resolved_params = dict(params) if params else {}
284+
if params_source:
285+
resolved_params, source_metadata = await resolve_remote_params(
286+
params_source, params_source_headers, fetch=fetch
287+
)
288+
# Source metadata + caller extras never override the canonical fields above.
289+
for key, value in {**source_metadata, **(extra_task_metadata or {})}.items():
290+
task_metadata.setdefault(key, str(value))
291+
elif extra_task_metadata:
292+
for key, value in extra_task_metadata.items():
293+
task_metadata.setdefault(key, str(value))
294+
295+
name = session_key(agent_name, channel, peer_id or "")
296+
# task/create carries only name/params (CreateTaskParams has no task_metadata field),
297+
# so we create first, then stamp task_metadata via a follow-up update below.
298+
task = await adk.acp.create_task(
299+
name=name,
300+
agent_name=agent_name,
301+
params=resolved_params or None,
302+
)
303+
304+
# Best-effort: stamp the resolved task_metadata (channel/sender/peer_id, plus the
305+
# display_name etc. from params_source) onto the task so it's labeled in the UI.
306+
# Failure must never break the run — the metadata is also returned on the result.
307+
if task_metadata:
308+
try:
309+
merged_task_metadata = {
310+
**_task_metadata_dict(getattr(task, "task_metadata", None)),
311+
**task_metadata,
312+
}
313+
await adk.tasks.update(task_id=task.id, task_metadata=merged_task_metadata)
314+
except Exception:
315+
logger.warning("Failed to stamp task_metadata on task %s", task.id, exc_info=True)
316+
317+
content = TextContent(author="user", content=text, format="markdown")
318+
319+
if acp_type == "sync":
320+
messages = await adk.acp.send_message(task_id=task.id, agent_name=agent_name, content=content)
321+
return WebhookResult(task_id=task.id, reply=_agent_reply_text(messages), task_metadata=task_metadata)
322+
323+
# Async: when we'll wait for the reply, snapshot existing message ids BEFORE the
324+
# event so a reused task's prior reply (session continuity) isn't mistaken for it.
325+
if wait:
326+
seen_ids, seen_count = await _message_snapshot(task.id)
327+
await adk.acp.send_event(task_id=task.id, agent_name=agent_name, content=content)
328+
reply = await _await_reply(task.id, seen_ids, seen_count=seen_count)
329+
else:
330+
await adk.acp.send_event(task_id=task.id, agent_name=agent_name, content=content)
331+
reply = None
332+
return WebhookResult(task_id=task.id, reply=reply, task_metadata=task_metadata)
333+
334+
335+
def _task_metadata_dict(value: object) -> dict[str, Any]:
336+
if isinstance(value, Mapping):
337+
return dict(value)
338+
return {}
339+
340+
341+
async def _message_snapshot(task_id: str) -> tuple[set[str], int]:
342+
messages = await adk.messages.list(task_id=task_id)
343+
messages = messages or []
344+
return {mid for m in messages if (mid := getattr(m, "id", None)) is not None}, len(messages)
345+
346+
347+
async def _message_ids(task_id: str) -> set[str]:
348+
# Only track real ids. Keeping None in the set would let a later id-less message
349+
# collide with it and be wrongly treated as already-seen (dropping a fresh reply).
350+
seen_ids, _ = await _message_snapshot(task_id)
351+
return seen_ids
352+
353+
354+
async def _await_reply(
355+
task_id: str,
356+
seen_ids: set[str | None],
357+
*,
358+
seen_count: int | None = None,
359+
timeout_s: float = 120.0,
360+
interval_s: float = 2.0,
361+
quiescence_s: float = 6.0,
362+
) -> str | None:
363+
"""Poll for THIS turn's reply — agent text in messages that weren't present before
364+
the event — until it settles (unchanged for ``quiescence_s``) or times out. Filtering
365+
on new message ids avoids returning a stale prior reply on a reused task."""
366+
import asyncio
367+
368+
waited = 0.0
369+
last: str | None = None
370+
stable_for = 0.0
371+
while waited < timeout_s:
372+
await asyncio.sleep(interval_s)
373+
waited += interval_s
374+
messages = await adk.messages.list(task_id=task_id)
375+
new = []
376+
for index, message in enumerate(messages or []):
377+
mid = getattr(message, "id", None)
378+
if mid is not None and mid not in seen_ids:
379+
new.append(message)
380+
elif mid is None and seen_count is not None and index >= seen_count:
381+
new.append(message)
382+
text = _agent_reply_text(new)
383+
if text and text == last:
384+
stable_for += interval_s
385+
if stable_for >= quiescence_s:
386+
return text
387+
elif text:
388+
last, stable_for = text, 0.0
389+
return last

0 commit comments

Comments
 (0)