From 239fb1ad33055f7ac2f40869719abbacf0fb908d Mon Sep 17 00:00:00 2001 From: Daniel Miller Date: Fri, 19 Jun 2026 14:51:39 -0400 Subject: [PATCH] feat(channels): add extensible channel abstraction with generic webhook ingress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a channel layer that normalizes external surfaces into agent turns and, for channels that respond, delivers the agent's reply back. The agent-driving core is channel-agnostic; a new channel (e.g. Slack) implements one interface without touching the router. - domain/channels/base.py: InboundMessage, ChannelBinding (route -> agent + an OPAQUE params dict forwarded verbatim to task/create; the platform does not interpret it), timing-safe shared-secret / HMAC-SHA256 verifiers, and the Channel ABC. Ingress: authenticate + to_inbound. Outbound (optional, OpenClaw-style): deliver + chunk (textChunkLimit), plus deliver_reply() — the buffered dispatcher that chunks a reply and delivers each block. A plain webhook leaves outbound unset (its reply is the HTTP response); push channels (Slack) set supports_outbound and implement deliver. - domain/channels/router.py: ChannelRouter.dispatch() — task/create (get-or-create on a per-conversation session key) + the turn, branching on ACP type (sync: message/send returns the reply inline; async: event/send, reply lands on the stream). await_reply() retrieves an async agent's settled reply for responding channels. - domain/channels/webhook.py: WebhookChannel — generic HTTP ingress with a per-route secret. Accepts Authorization: Bearer / x-openclaw-webhook-secret, or an X-Hub-Signature-256 HMAC (GitHub/Gitea). Generic JSON normalization. - api/routes/channels.py: POST /channels/webhook/{route_id} (+ optional ?wait to return the reply for synchronous callers). Route bindings load from CHANNELS_WEBHOOK_ROUTES env for now (seam for a future config store). - Whitelist /channels/webhook in the auth middleware so it bypasses the agent API-key check and verifies its own per-route secret instead; register the router. - Regenerate openapi.yaml. Co-Authored-By: Claude Opus 4.8 (1M context) --- agentex/openapi.yaml | 42 ++++++ agentex/src/api/app.py | 2 + agentex/src/api/middleware_utils.py | 3 + agentex/src/api/routes/channels.py | 124 +++++++++++++++++ agentex/src/domain/channels/__init__.py | 1 + agentex/src/domain/channels/base.py | 131 ++++++++++++++++++ agentex/src/domain/channels/router.py | 171 ++++++++++++++++++++++++ agentex/src/domain/channels/webhook.py | 63 +++++++++ 8 files changed, 537 insertions(+) create mode 100644 agentex/src/api/routes/channels.py create mode 100644 agentex/src/domain/channels/__init__.py create mode 100644 agentex/src/domain/channels/base.py create mode 100644 agentex/src/domain/channels/router.py create mode 100644 agentex/src/domain/channels/webhook.py diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index b122020e..5bbec96e 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -364,6 +364,48 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /channels/webhook/{route_id}: + post: + tags: + - Channels + summary: Generic webhook channel + description: 'Authenticated webhook ingress: verifies a per-route secret/HMAC + and drives an agent turn. Auth is whitelisted at the middleware; the channel + verifies the route secret instead.' + operationId: handle_webhook_channels_webhook__route_id__post + parameters: + - name: route_id + in: path + required: true + schema: + type: string + title: Route Id + - name: wait + in: query + required: false + schema: + type: boolean + description: 'If true, wait for the agent''s reply and return it (for synchronous + callers). Default false: return immediately with the task_id.' + default: false + title: Wait + description: 'If true, wait for the agent''s reply and return it (for synchronous + callers). Default false: return immediately with the task_id.' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + type: object + additionalProperties: true + title: Response Handle Webhook Channels Webhook Route Id Post + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /tasks/{task_id}: get: tags: diff --git a/agentex/src/api/app.py b/agentex/src/api/app.py index 41a9eada..b6d3af20 100644 --- a/agentex/src/api/app.py +++ b/agentex/src/api/app.py @@ -31,6 +31,7 @@ agent_api_keys, agent_task_tracker, agents, + channels, checkpoints, deployment_history, deployments, @@ -193,6 +194,7 @@ async def handle_unexpected(request, exc): # Include all routers fastapi_app.include_router(agents.router) +fastapi_app.include_router(channels.router) fastapi_app.include_router(tasks.router) fastapi_app.include_router(messages.router) fastapi_app.include_router(spans.router) diff --git a/agentex/src/api/middleware_utils.py b/agentex/src/api/middleware_utils.py index 92ab4c22..3ab30379 100644 --- a/agentex/src/api/middleware_utils.py +++ b/agentex/src/api/middleware_utils.py @@ -28,6 +28,9 @@ "/readyz", "/ping", "/echo", + # Channels (webhook, …) bypass agentex API-key auth and verify a per-route + # shared secret / HMAC inside the channel instead. + "/channels/webhook", } DROP_HEADERS: set[str] = { diff --git a/agentex/src/api/routes/channels.py b/agentex/src/api/routes/channels.py new file mode 100644 index 00000000..3d31a8b0 --- /dev/null +++ b/agentex/src/api/routes/channels.py @@ -0,0 +1,124 @@ +"""Channels API: external surfaces (webhook now, Slack later) that drive agent turns. + +The webhook endpoint is auth-whitelisted in `middleware_utils.WHITELISTED_ROUTES` +(it bypasses the agentex API-key auth) and instead verifies a per-route shared +secret / HMAC inside the channel. Each route binds to one agent and an opaque +`params` dict forwarded verbatim to task/create (agentex does not interpret it — +the bound agent does). + +Route bindings live in the CHANNELS_WEBHOOK_ROUTES env var (JSON) for now — the +seam where a real per-config store (resolving a saved agent_config_id) plugs in later: + + CHANNELS_WEBHOOK_ROUTES='{"demo": {"secret": "", + "agent_name": "golden-agent", + "params": {"system_prompt": "You are ...", "mcps": []}}}' +""" + +from __future__ import annotations + +import json +import os + +from fastapi import APIRouter, HTTPException, Query, Request + +from src.domain.channels.base import ChannelBinding +from src.domain.channels.router import ChannelRouter +from src.domain.channels.webhook import MAX_BODY_BYTES, WebhookChannel +from src.domain.services.task_message_service import DTaskMessageService +from src.domain.use_cases.agents_acp_use_case import DAgentsACPUseCase +from src.domain.use_cases.agents_use_case import DAgentsUseCase +from src.utils.logging import make_logger + +logger = make_logger(__name__) + +router = APIRouter(prefix="/channels", tags=["Channels"]) + +# Channel registry — add "slack": SlackChannel() here when it lands. +_WEBHOOK = WebhookChannel() + + +def _webhook_binding(route_id: str) -> ChannelBinding | None: + """Resolve a webhook route's binding. Env-backed seam; replace with a config store.""" + raw = os.environ.get("CHANNELS_WEBHOOK_ROUTES") + if not raw: + return None + try: + routes = json.loads(raw) + except json.JSONDecodeError: + logger.error("[channels] CHANNELS_WEBHOOK_ROUTES is not valid JSON") + return None + cfg = routes.get(route_id) + if not isinstance(cfg, dict) or not cfg.get("secret") or not cfg.get("agent_name"): + return None + # `params` is opaque — forwarded verbatim to task/create; agentex does not + # interpret it (an agent like golden-agent reads system_prompt/mcps/etc. there). + params = cfg.get("params") + return ChannelBinding( + secret=cfg["secret"], + agent_name=cfg["agent_name"], + params=params if isinstance(params, dict) else {}, + ) + + +@router.post( + "/webhook/{route_id}", + summary="Generic webhook channel", + description="Authenticated webhook ingress: verifies a per-route secret/HMAC and " + "drives an agent turn. Auth is whitelisted at the middleware; the channel verifies " + "the route secret instead.", +) +async def handle_webhook( + route_id: str, + request: Request, + agents_acp_use_case: DAgentsACPUseCase, + agents_use_case: DAgentsUseCase, + task_message_service: DTaskMessageService, + wait: bool = Query( + False, + description="If true, wait for the agent's reply and return it (for synchronous " + "callers). Default false: return immediately with the task_id.", + ), +) -> dict: + binding = _webhook_binding(route_id) + if binding is None: + raise HTTPException(status_code=404, detail="unknown route") + + raw = await request.body() + if len(raw) > MAX_BODY_BYTES: + raise HTTPException(status_code=413, detail="payload too large") + if not _WEBHOOK.authenticate(binding, request, raw): + raise HTTPException(status_code=401, detail="unauthorized") + if "application/json" not in request.headers.get("content-type", ""): + raise HTTPException(status_code=400, detail="expected application/json") + try: + body = json.loads(raw) + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="invalid json") + if not isinstance(body, dict): + raise HTTPException(status_code=400, detail="json body must be an object") + + # Resolve the agent's ACP type so the router picks the right turn method + # (sync -> message/send returns the reply inline; async -> event/send). + agent = await agents_use_case.get(name=binding.agent_name) + + inbound = _WEBHOOK.to_inbound(route_id, body) + router_ = ChannelRouter(agents_acp_use_case, task_message_service) + result = await router_.dispatch(inbound, binding, agent.acp_type) + + # A plain webhook has no outbound push (supports_outbound is False) — its reply is + # the HTTP response. Sync agents reply inline; for async, `wait` streams it back. + # A push channel (Slack) would instead: reply = result.reply or await_reply(...); + # await deliver_reply(channel, peer_id, reply). + reply = result.reply + if reply is None and wait: + reply = await router_.await_reply(result.task_id, result.after_id) + + response = { + "ok": True, + "channel": "webhook", + "route_id": route_id, + "task_id": result.task_id, + } + if reply is not None: + response["reply"] = reply + return response diff --git a/agentex/src/domain/channels/__init__.py b/agentex/src/domain/channels/__init__.py new file mode 100644 index 00000000..79466e4d --- /dev/null +++ b/agentex/src/domain/channels/__init__.py @@ -0,0 +1 @@ +"""Channels: normalize external surfaces (webhook, Slack, …) into agent turns.""" diff --git a/agentex/src/domain/channels/base.py b/agentex/src/domain/channels/base.py new file mode 100644 index 00000000..762a78b8 --- /dev/null +++ b/agentex/src/domain/channels/base.py @@ -0,0 +1,131 @@ +"""Channel abstraction: normalize any external surface (webhook, Slack, …) into one +inbound shape, and drive an agent turn from it. + +Modeled on OpenClaw's channel plugins (github.com/openclaw/openclaw, +`src/channels/**`) and the claw0 tutorial: every platform produces the same +`InboundMessage`; the agent-driving core is channel-agnostic. A new channel +(Slack, etc.) implements `Channel` — `authenticate` + `to_inbound` for ingress, +and (when it needs to push replies) its own outbound — without touching the router. +""" + +from __future__ import annotations + +import hashlib +import hmac +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any + +from starlette.requests import Request + + +@dataclass +class InboundMessage: + """Normalized inbound event. Every channel produces this; the router only sees this.""" + + text: str + channel: str # "webhook" | "slack" | … + route_id: str = "" # which binding received it (webhook route / slack team) + peer_id: str = "" # conversation scope: DM user, channel:thread, route_id, … + sender_id: str = "" + raw: dict[str, Any] = field(default_factory=dict) + + def session_key(self, agent_name: str) -> str: + """Stable per-conversation key → reused as the agentex task name (get-or-create).""" + basis = self.peer_id or self.route_id or "main" + digest = hashlib.sha1( + f"{agent_name}:{self.channel}:{basis}".encode() + ).hexdigest()[:16] + return f"ch-{self.channel}-{digest}" + + +@dataclass +class ChannelBinding: + """A route's binding to one agent. + + `params` is an OPAQUE dict forwarded verbatim as the task/create params — the + agentex platform does not interpret it. Whatever a given agent expects there + (e.g. golden-agent's system_prompt / mcps / harness / model) is that agent's + concern, not the channel layer's. Later this can be sourced from a saved config. + """ + + secret: str + agent_name: str + params: dict[str, Any] = field(default_factory=dict) + # Headers the router forwards to the agent (auth/delegation). Empty for local/open. + forward_headers: dict[str, str] = field(default_factory=dict) + + +def verify_shared_secret(presented: str | None, secret: str) -> bool: + """Timing-safe shared-secret check (OpenClaw `safeEqualSecret`).""" + if not presented or not secret: + return False + return hmac.compare_digest(presented, secret) + + +def verify_hmac_sha256(secret: str, body: bytes, signature_header: str | None) -> bool: + """Verify an `sha256=` HMAC signature (GitHub/Gitea-style).""" + if not signature_header or not signature_header.startswith("sha256="): + return False + expected = "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + return hmac.compare_digest(expected, signature_header) + + +class Channel(ABC): + """One external surface, modeled on OpenClaw's channel plugin facets. + + Ingress (required): `authenticate` + `to_inbound` (normalize an inbound event). + Outbound (optional): `deliver` + `chunk` — OpenClaw's `ChannelOutboundAdapter` + (`deliver` + `chunker`/`textChunkLimit`). Interactive channels that push replies + (Slack -> chat.postMessage) set `supports_outbound = True` and implement `deliver`. + A plain webhook leaves it unset — its reply is the HTTP response, so the caller + returns the reply rather than pushing it. `deliver_reply()` (below) is OpenClaw's + buffered dispatcher: it chunks the agent reply and calls `deliver` per block. + """ + + name: str = "unknown" + supports_outbound: bool = False + # Platform message size limit for chunking (OpenClaw textChunkLimit). None = no split. + text_chunk_limit: int | None = None + + @abstractmethod + def authenticate( + self, binding: ChannelBinding, request: Request, raw_body: bytes + ) -> bool: + """Return True iff the request is authentic for this binding.""" + + @abstractmethod + def to_inbound(self, route_id: str, body: dict[str, Any]) -> InboundMessage: + """Normalize the parsed JSON payload into an InboundMessage.""" + + async def deliver(self, peer_id: str, text: str) -> None: + """Send one (already-chunked) reply block to conversation `peer_id`. + + Default: unsupported. Push channels override this (and set supports_outbound). + """ + raise NotImplementedError(f"channel {self.name!r} has no outbound deliver()") + + def chunk(self, text: str) -> list[str]: + """Split a reply to `text_chunk_limit`, preferring paragraph boundaries + (OpenClaw's chunker, markdown-agnostic default).""" + limit = self.text_chunk_limit + if not limit or len(text) <= limit: + return [text] + out: list[str] = [] + cur = "" + for para in text.split("\n\n"): + if cur and len(cur) + len(para) + 2 > limit: + out.append(cur) + cur = "" + cur = f"{cur}\n\n{para}" if cur else para + if cur: + out.append(cur) + return out or [text[:limit]] + + +async def deliver_reply(channel: Channel, peer_id: str, text: str) -> None: + """Buffered reply dispatcher (OpenClaw dispatchReplyWithBufferedBlockDispatcher): + chunk the agent reply and deliver each block through the channel's outbound.""" + for block in channel.chunk(text): + if block.strip(): + await channel.deliver(peer_id, block) diff --git a/agentex/src/domain/channels/router.py b/agentex/src/domain/channels/router.py new file mode 100644 index 00000000..f07c5811 --- /dev/null +++ b/agentex/src/domain/channels/router.py @@ -0,0 +1,171 @@ +"""ChannelRouter: turn a normalized InboundMessage into an agent turn, and (for +channels that respond) retrieve the agent's reply. + +Channel-agnostic — the same path serves webhook, Slack, or any future channel. +Works for both ACP types: sync agents take the turn via message/send (reply returned +inline); async/agentic agents via event/send (reply lands on the task's message +stream, retrieved by `await_reply`). Continuity is free: the InboundMessage's +session_key is reused as the agentex task name (task/create is get-or-create on name). + +A responding channel pairs this with the outbound side (OpenClaw deliver/chunker): + result = await router.dispatch(inbound, binding, acp_type) + reply = result.reply or await router.await_reply(result.task_id, result.after_id) + await deliver_reply(channel, inbound.peer_id, reply) # chunk + deliver per block +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass + +from src.domain.channels.base import ChannelBinding, InboundMessage +from src.domain.entities.agents import ACPType +from src.domain.entities.agents_rpc import ( + AgentRPCMethod, + CreateTaskRequestEntity, + SendEventRequestEntity, + SendMessageRequestEntity, +) +from src.domain.entities.task_messages import ( + MessageAuthor, + TaskMessageContentType, + TextContentEntity, +) +from src.domain.services.task_message_service import TaskMessageService +from src.domain.use_cases.agents_acp_use_case import AgentsACPUseCase +from src.utils.logging import make_logger + +logger = make_logger(__name__) + + +@dataclass +class DispatchResult: + task_id: str + # Sync agents return their reply inline. Async agents reply later on the task + # stream — `reply` is None and `after_id` marks where to read new messages from. + reply: str | None = None + after_id: str | None = None + + +class ChannelRouter: + def __init__( + self, + acp_use_case: AgentsACPUseCase, + task_message_service: TaskMessageService, + ): + self._acp = acp_use_case + self._messages = task_message_service + + async def dispatch( + self, inbound: InboundMessage, binding: ChannelBinding, acp_type: ACPType + ) -> DispatchResult: + session_key = inbound.session_key(binding.agent_name) + headers = binding.forward_headers or None + + task = await self._acp.handle_rpc_request( + method=AgentRPCMethod.TASK_CREATE, + params=CreateTaskRequestEntity( + name=session_key, + params=binding.params, + task_metadata={ + "channel": inbound.channel, + "route_id": inbound.route_id, + "peer_id": inbound.peer_id, + "sender_id": inbound.sender_id, + }, + ), + agent_name=binding.agent_name, + request_headers=headers, + ) + task_id = task.id + logger.info( + "[channels] %s route=%s acp=%s -> task %s (session %s)", + inbound.channel, + inbound.route_id, + acp_type.value, + task_id, + session_key, + ) + + content = TextContentEntity(author=MessageAuthor.USER, content=inbound.text) + + if acp_type == ACPType.SYNC: + # Sync: message/send carries the turn and returns the reply messages inline. + messages = await self._acp.handle_rpc_request( + method=AgentRPCMethod.MESSAGE_SEND, + params=SendMessageRequestEntity( + task_id=task_id, content=content, stream=False + ), + agent_name=binding.agent_name, + request_headers=headers, + ) + return DispatchResult(task_id=task_id, reply=_agent_text(messages)) + + # Async / agentic: event/send delivers the turn; the reply lands on the stream. + # Mark the newest message now so await_reply only reads the new reply. + after_id = await self._newest_message_id(task_id) + await self._acp.handle_rpc_request( + method=AgentRPCMethod.EVENT_SEND, + params=SendEventRequestEntity(task_id=task_id, content=content), + agent_name=binding.agent_name, + request_headers=headers, + ) + return DispatchResult(task_id=task_id, reply=None, after_id=after_id) + + async def _newest_message_id(self, task_id: str) -> str | None: + msgs = await self._messages.get_messages( + task_id=task_id, limit=1, page_number=1 + ) + return msgs[0].id if msgs else None + + async def await_reply( + self, + task_id: str, + after_id: str | None = None, + timeout_s: float = 120.0, + interval_s: float = 2.0, + quiescence_s: float = 6.0, + ) -> str | None: + """Poll the task's messages for the agent's text reply after `after_id`, until + it settles (no change for `quiescence_s`) or `timeout_s`. Used by responding + channels to retrieve an async agent's reply before delivering it.""" + waited = 0.0 + last: str | None = None + stable_for = 0.0 + while waited < timeout_s: + await asyncio.sleep(interval_s) + waited += interval_s + msgs = await self._messages.get_messages( + task_id=task_id, + limit=100, + page_number=1, + order_direction="asc", + after_id=after_id, + ) + text = _agent_text(msgs) + if text and text == last: + stable_for += interval_s + if stable_for >= quiescence_s: + return text + elif text: + last = text + stable_for = 0.0 + return last + + +def _is_agent_text(message: object) -> bool: + content = getattr(message, "content", None) + return ( + content is not None + and getattr(content, "type", None) == TaskMessageContentType.TEXT + and getattr(content, "author", None) == MessageAuthor.AGENT + and bool((getattr(content, "content", "") or "").strip()) + ) + + +def _agent_text(messages: object) -> str | None: + """Join the agent-authored text from a message list (sync result or polled stream).""" + if not isinstance(messages, list): + return None + parts = [m.content.content.strip() for m in messages if _is_agent_text(m)] + return "\n\n".join(parts) if parts else None diff --git a/agentex/src/domain/channels/webhook.py b/agentex/src/domain/channels/webhook.py new file mode 100644 index 00000000..5ba3598a --- /dev/null +++ b/agentex/src/domain/channels/webhook.py @@ -0,0 +1,63 @@ +"""WebhookChannel: generic HTTP ingress (GitHub/Gitea/Zapier/anything that POSTs JSON). + +Modeled on OpenClaw `extensions/webhooks`: per-route shared secret, POST + JSON only, +size cap, timing-safe auth. Accepts either: + - `Authorization: Bearer ` / `x-openclaw-webhook-secret: ` (generic), or + - `X-Hub-Signature-256: sha256=` (GitHub/Gitea) + +The payload is normalized generically (text/message/goal/prompt, else the raw JSON). +Source-specific shaping (e.g. PR rendering) belongs to the caller or the agent's +system prompt — this stays a generic channel. +""" + +from __future__ import annotations + +import json +from typing import Any + +from starlette.requests import Request + +from src.domain.channels.base import ( + Channel, + ChannelBinding, + InboundMessage, + verify_hmac_sha256, + verify_shared_secret, +) + +MAX_BODY_BYTES = 256 * 1024 + + +class WebhookChannel(Channel): + name = "webhook" + + def authenticate( + self, binding: ChannelBinding, request: Request, raw_body: bytes + ) -> bool: + gh_sig = request.headers.get("x-hub-signature-256") + if gh_sig is not None: + return verify_hmac_sha256(binding.secret, raw_body, gh_sig) + auth = request.headers.get("authorization", "") + presented = ( + auth[len("Bearer ") :].strip() if auth.startswith("Bearer ") else None + ) + presented = presented or request.headers.get("x-openclaw-webhook-secret") + return verify_shared_secret(presented, binding.secret) + + def to_inbound(self, route_id: str, body: dict[str, Any]) -> InboundMessage: + return InboundMessage( + text=_render_text(body), + channel=self.name, + route_id=route_id, + peer_id=route_id, + sender_id="webhook", + raw=body, + ) + + +def _render_text(body: dict[str, Any]) -> str: + for key in ("text", "message", "goal", "prompt"): + value = body.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return json.dumps(body, indent=2)[:8000]