Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions agentex/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,48 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/channels/webhook/{route_id}:
post:
tags:
- Channels
summary: Generic webhook channel
description: 'Authenticated webhook ingress: verifies a per-route secret/HMAC
and drives an agent turn. Auth is whitelisted at the middleware; the channel
verifies the route secret instead.'
operationId: handle_webhook_channels_webhook__route_id__post
parameters:
- name: route_id
in: path
required: true
schema:
type: string
title: Route Id
- name: wait
in: query
required: false
schema:
type: boolean
description: 'If true, wait for the agent''s reply and return it (for synchronous
callers). Default false: return immediately with the task_id.'
default: false
title: Wait
description: 'If true, wait for the agent''s reply and return it (for synchronous
callers). Default false: return immediately with the task_id.'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
type: object
additionalProperties: true
title: Response Handle Webhook Channels Webhook Route Id Post
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/tasks/{task_id}:
get:
tags:
Expand Down
2 changes: 2 additions & 0 deletions agentex/src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
agent_api_keys,
agent_task_tracker,
agents,
channels,
checkpoints,
deployment_history,
deployments,
Expand Down Expand Up @@ -193,6 +194,7 @@ async def handle_unexpected(request, exc):

# Include all routers
fastapi_app.include_router(agents.router)
fastapi_app.include_router(channels.router)
fastapi_app.include_router(tasks.router)
fastapi_app.include_router(messages.router)
fastapi_app.include_router(spans.router)
Expand Down
3 changes: 3 additions & 0 deletions agentex/src/api/middleware_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"/readyz",
"/ping",
"/echo",
# Channels (webhook, …) bypass agentex API-key auth and verify a per-route
# shared secret / HMAC inside the channel instead.
"/channels/webhook",
}

DROP_HEADERS: set[str] = {
Expand Down
124 changes: 124 additions & 0 deletions agentex/src/api/routes/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Channels API: external surfaces (webhook now, Slack later) that drive agent turns.

The webhook endpoint is auth-whitelisted in `middleware_utils.WHITELISTED_ROUTES`
(it bypasses the agentex API-key auth) and instead verifies a per-route shared
secret / HMAC inside the channel. Each route binds to one agent and an opaque
`params` dict forwarded verbatim to task/create (agentex does not interpret it —
the bound agent does).

Route bindings live in the CHANNELS_WEBHOOK_ROUTES env var (JSON) for now — the
seam where a real per-config store (resolving a saved agent_config_id) plugs in later:

CHANNELS_WEBHOOK_ROUTES='{"demo": {"secret": "<shared-secret>",
"agent_name": "golden-agent",
"params": {"system_prompt": "You are ...", "mcps": []}}}'
"""

from __future__ import annotations

import json
import os

from fastapi import APIRouter, HTTPException, Query, Request

from src.domain.channels.base import ChannelBinding
from src.domain.channels.router import ChannelRouter
from src.domain.channels.webhook import MAX_BODY_BYTES, WebhookChannel
from src.domain.services.task_message_service import DTaskMessageService
from src.domain.use_cases.agents_acp_use_case import DAgentsACPUseCase
from src.domain.use_cases.agents_use_case import DAgentsUseCase
from src.utils.logging import make_logger

logger = make_logger(__name__)

router = APIRouter(prefix="/channels", tags=["Channels"])

# Channel registry — add "slack": SlackChannel() here when it lands.
_WEBHOOK = WebhookChannel()


def _webhook_binding(route_id: str) -> ChannelBinding | None:
"""Resolve a webhook route's binding. Env-backed seam; replace with a config store."""
raw = os.environ.get("CHANNELS_WEBHOOK_ROUTES")
if not raw:
return None
try:
routes = json.loads(raw)
except json.JSONDecodeError:
logger.error("[channels] CHANNELS_WEBHOOK_ROUTES is not valid JSON")
return None
cfg = routes.get(route_id)
if not isinstance(cfg, dict) or not cfg.get("secret") or not cfg.get("agent_name"):
return None
# `params` is opaque — forwarded verbatim to task/create; agentex does not
# interpret it (an agent like golden-agent reads system_prompt/mcps/etc. there).
params = cfg.get("params")
return ChannelBinding(
secret=cfg["secret"],
agent_name=cfg["agent_name"],
params=params if isinstance(params, dict) else {},
)


@router.post(
"/webhook/{route_id}",
summary="Generic webhook channel",
description="Authenticated webhook ingress: verifies a per-route secret/HMAC and "
"drives an agent turn. Auth is whitelisted at the middleware; the channel verifies "
"the route secret instead.",
)
async def handle_webhook(
route_id: str,
request: Request,
agents_acp_use_case: DAgentsACPUseCase,
agents_use_case: DAgentsUseCase,
task_message_service: DTaskMessageService,
wait: bool = Query(
False,
description="If true, wait for the agent's reply and return it (for synchronous "
"callers). Default false: return immediately with the task_id.",
),
) -> dict:
binding = _webhook_binding(route_id)
if binding is None:
raise HTTPException(status_code=404, detail="unknown route")

raw = await request.body()
if len(raw) > MAX_BODY_BYTES:
raise HTTPException(status_code=413, detail="payload too large")
if not _WEBHOOK.authenticate(binding, request, raw):
raise HTTPException(status_code=401, detail="unauthorized")
if "application/json" not in request.headers.get("content-type", ""):
raise HTTPException(status_code=400, detail="expected application/json")
try:
body = json.loads(raw)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="invalid json")
if not isinstance(body, dict):
raise HTTPException(status_code=400, detail="json body must be an object")

# Resolve the agent's ACP type so the router picks the right turn method
# (sync -> message/send returns the reply inline; async -> event/send).
agent = await agents_use_case.get(name=binding.agent_name)

inbound = _WEBHOOK.to_inbound(route_id, body)
router_ = ChannelRouter(agents_acp_use_case, task_message_service)
result = await router_.dispatch(inbound, binding, agent.acp_type)

# A plain webhook has no outbound push (supports_outbound is False) — its reply is
# the HTTP response. Sync agents reply inline; for async, `wait` streams it back.
# A push channel (Slack) would instead: reply = result.reply or await_reply(...);
# await deliver_reply(channel, peer_id, reply).
reply = result.reply
if reply is None and wait:
reply = await router_.await_reply(result.task_id, result.after_id)

response = {
"ok": True,
"channel": "webhook",
"route_id": route_id,
"task_id": result.task_id,
}
if reply is not None:
response["reply"] = reply
return response
1 change: 1 addition & 0 deletions agentex/src/domain/channels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Channels: normalize external surfaces (webhook, Slack, …) into agent turns."""
131 changes: 131 additions & 0 deletions agentex/src/domain/channels/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Channel abstraction: normalize any external surface (webhook, Slack, …) into one
inbound shape, and drive an agent turn from it.

Modeled on OpenClaw's channel plugins (github.com/openclaw/openclaw,
`src/channels/**`) and the claw0 tutorial: every platform produces the same
`InboundMessage`; the agent-driving core is channel-agnostic. A new channel
(Slack, etc.) implements `Channel` — `authenticate` + `to_inbound` for ingress,
and (when it needs to push replies) its own outbound — without touching the router.
"""

from __future__ import annotations

import hashlib
import hmac
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any

from starlette.requests import Request


@dataclass
class InboundMessage:
"""Normalized inbound event. Every channel produces this; the router only sees this."""

text: str
channel: str # "webhook" | "slack" | …
route_id: str = "" # which binding received it (webhook route / slack team)
peer_id: str = "" # conversation scope: DM user, channel:thread, route_id, …
sender_id: str = ""
raw: dict[str, Any] = field(default_factory=dict)

def session_key(self, agent_name: str) -> str:
"""Stable per-conversation key → reused as the agentex task name (get-or-create)."""
basis = self.peer_id or self.route_id or "main"
digest = hashlib.sha1(
f"{agent_name}:{self.channel}:{basis}".encode()
).hexdigest()[:16]
return f"ch-{self.channel}-{digest}"


@dataclass
class ChannelBinding:
"""A route's binding to one agent.

`params` is an OPAQUE dict forwarded verbatim as the task/create params — the
agentex platform does not interpret it. Whatever a given agent expects there
(e.g. golden-agent's system_prompt / mcps / harness / model) is that agent's
concern, not the channel layer's. Later this can be sourced from a saved config.
"""

secret: str
agent_name: str
params: dict[str, Any] = field(default_factory=dict)
# Headers the router forwards to the agent (auth/delegation). Empty for local/open.
forward_headers: dict[str, str] = field(default_factory=dict)


def verify_shared_secret(presented: str | None, secret: str) -> bool:
"""Timing-safe shared-secret check (OpenClaw `safeEqualSecret`)."""
if not presented or not secret:
return False
return hmac.compare_digest(presented, secret)


def verify_hmac_sha256(secret: str, body: bytes, signature_header: str | None) -> bool:
"""Verify an `sha256=<hex>` HMAC signature (GitHub/Gitea-style)."""
if not signature_header or not signature_header.startswith("sha256="):
return False
expected = "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature_header)


class Channel(ABC):
"""One external surface, modeled on OpenClaw's channel plugin facets.

Ingress (required): `authenticate` + `to_inbound` (normalize an inbound event).
Outbound (optional): `deliver` + `chunk` — OpenClaw's `ChannelOutboundAdapter`
(`deliver` + `chunker`/`textChunkLimit`). Interactive channels that push replies
(Slack -> chat.postMessage) set `supports_outbound = True` and implement `deliver`.
A plain webhook leaves it unset — its reply is the HTTP response, so the caller
returns the reply rather than pushing it. `deliver_reply()` (below) is OpenClaw's
buffered dispatcher: it chunks the agent reply and calls `deliver` per block.
"""

name: str = "unknown"
supports_outbound: bool = False
# Platform message size limit for chunking (OpenClaw textChunkLimit). None = no split.
text_chunk_limit: int | None = None

@abstractmethod
def authenticate(
self, binding: ChannelBinding, request: Request, raw_body: bytes
) -> bool:
"""Return True iff the request is authentic for this binding."""

@abstractmethod
def to_inbound(self, route_id: str, body: dict[str, Any]) -> InboundMessage:
"""Normalize the parsed JSON payload into an InboundMessage."""

async def deliver(self, peer_id: str, text: str) -> None:
"""Send one (already-chunked) reply block to conversation `peer_id`.

Default: unsupported. Push channels override this (and set supports_outbound).
"""
raise NotImplementedError(f"channel {self.name!r} has no outbound deliver()")

def chunk(self, text: str) -> list[str]:
"""Split a reply to `text_chunk_limit`, preferring paragraph boundaries
(OpenClaw's chunker, markdown-agnostic default)."""
limit = self.text_chunk_limit
if not limit or len(text) <= limit:
return [text]
out: list[str] = []
cur = ""
for para in text.split("\n\n"):
if cur and len(cur) + len(para) + 2 > limit:
out.append(cur)
cur = ""
cur = f"{cur}\n\n{para}" if cur else para
if cur:
out.append(cur)
return out or [text[:limit]]


async def deliver_reply(channel: Channel, peer_id: str, text: str) -> None:
"""Buffered reply dispatcher (OpenClaw dispatchReplyWithBufferedBlockDispatcher):
chunk the agent reply and deliver each block through the channel's outbound."""
for block in channel.chunk(text):
if block.strip():
await channel.deliver(peer_id, block)
Loading
Loading