diff --git a/.github/workflows/test-api.yml b/.github/workflows/test-api.yml new file mode 100644 index 00000000..bb27ceaa --- /dev/null +++ b/.github/workflows/test-api.yml @@ -0,0 +1,67 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Test API + +permissions: + contents: read + +on: + workflow_dispatch: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + failover: + runs-on: ubuntu-latest + services: + mock-server: + image: livekit/test-server:latest + ports: + - 9999:9999 + - 10000:10000 + - 10001:10001 + - 10002:10002 + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + with: + submodules: true + + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 + with: + python-version: "3.12" + + - name: Install uv + uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 + + - name: Install livekit-api (without livekit-rtc) + run: | + uv venv .failover-venv + source .failover-venv/bin/activate + uv pip install ./livekit-protocol ./livekit-api pytest aiohttp + + - name: Wait for mock server + run: | + for i in $(seq 1 30); do + curl -sf http://127.0.0.1:9999/settings/regions >/dev/null && exit 0 + sleep 1 + done + echo "mock server did not become ready" && exit 1 + + - name: Run API tests + run: | + source .failover-venv/bin/activate + pytest tests/api -v diff --git a/livekit-api/livekit/api/_failover.py b/livekit-api/livekit/api/_failover.py new file mode 100644 index 00000000..49d239ed --- /dev/null +++ b/livekit-api/livekit/api/_failover.py @@ -0,0 +1,150 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Region failover for the Twirp API clients. + +On a retryable failure (any transport error or HTTP 5xx) the client discovers +alternative LiveKit Cloud regions via ``/settings/regions`` and replays the +request against the next region, with exponential backoff. 4xx responses are +returned immediately. +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass +from typing import Dict, List, Optional +from urllib.parse import urlparse + +import aiohttp + +FAILOVER_MAX_ATTEMPTS = 3 +FAILOVER_BACKOFF_BASE = 0.2 # seconds + + +def failover_attempts(enabled: bool, host: Optional[str], force: bool = False) -> int: + """Total request attempts for a host; 1 means no failover. Failover only + engages when enabled and the host is a LiveKit Cloud domain. ``force`` + bypasses the cloud-host check and is for internal testing only. + """ + if enabled and (force or (host is not None and is_cloud(host))): + return FAILOVER_MAX_ATTEMPTS + return 1 + + +def is_cloud(host: str) -> bool: + # Failover only engages for LiveKit Cloud project domains. + return host.endswith(".livekit.cloud") + + +def to_http(url: str) -> str: + """Normalizes a region URL to an http(s) scheme (ws -> http, wss -> https).""" + if url.startswith("ws"): + return "http" + url[2:] + return url + + +def origin_of(url: str) -> str: + """Returns the scheme://host[:port] origin of a URL, dropping any path.""" + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + +def host_key(url: str) -> str: + """A stable key identifying a host (including port) for dedup across attempts.""" + return urlparse(url).netloc.lower() + + +def pick_next(region_origins: List[str], attempted: set[str]) -> Optional[str]: + """Returns the first region origin whose host has not yet been attempted.""" + for origin in region_origins: + if host_key(origin) not in attempted: + return origin + return None + + +@dataclass +class _CacheEntry: + origins: List[str] + fetched_at: float + ttl: float + + +class RegionCache: + """Process-wide cache of the LiveKit Cloud region list, keyed by host.""" + + def __init__(self) -> None: + self._entries: Dict[str, _CacheEntry] = {} + + async def region_origins( + self, + session: aiohttp.ClientSession, + origin: str, + headers: Dict[str, str], + ) -> List[str]: + """Returns alternative region origins for ``origin``, fetching + ``/settings/regions`` if the cache is stale. Best-effort: on a fetch + failure it serves a stale cached list when available, otherwise an empty + list. Forwards ``headers`` so a valid token — and any test directives — + reach the discovery endpoint.""" + key = host_key(origin) + entry = self._entries.get(key) + if entry is not None and (time.monotonic() - entry.fetched_at) < entry.ttl: + return entry.origins + + try: + origins, ttl = await self._fetch(session, origin, headers) + except Exception: + return entry.origins if entry is not None else [] + + # A zero TTL (e.g. Cache-Control: max-age=0) means "do not cache". + if ttl > 0: + self._entries[key] = _CacheEntry(origins, time.monotonic(), ttl) + return origins + + async def _fetch( + self, + session: aiohttp.ClientSession, + origin: str, + headers: Dict[str, str], + ) -> tuple[List[str], float]: + fetch_headers = { + k: v for k, v in headers.items() if k.lower() not in ("content-type", "content-length") + } + # Short timeout so a slow/unreachable discovery endpoint doesn't stall + # the failover path. + async with session.get( + f"{origin}/settings/regions", + headers=fetch_headers, + timeout=aiohttp.ClientTimeout(total=2), + ) as resp: + if resp.status != 200: + raise RuntimeError(f"region discovery failed: {resp.status}") + ttl = _parse_max_age(resp.headers.get("Cache-Control")) + body = await resp.json() + origins = [origin_of(to_http(r["url"])) for r in body.get("regions", []) if r.get("url")] + return origins, ttl + + +def _parse_max_age(cache_control: Optional[str]) -> float: + if not cache_control: + return 0.0 + for directive in cache_control.split(","): + directive = directive.strip().lower() + if directive.startswith("max-age="): + try: + return float(int(directive[len("max-age=") :])) + except ValueError: + return 0.0 + return 0.0 diff --git a/livekit-api/livekit/api/_service.py b/livekit-api/livekit/api/_service.py index 7777f6ee..a198c388 100644 --- a/livekit-api/livekit/api/_service.py +++ b/livekit-api/livekit/api/_service.py @@ -9,8 +9,15 @@ class Service(ABC): - def __init__(self, session: aiohttp.ClientSession, host: str, api_key: str, api_secret: str): - self._client = TwirpClient(session, host, "livekit") + def __init__( + self, + session: aiohttp.ClientSession, + host: str, + api_key: str, + api_secret: str, + failover: bool = True, + ): + self._client = TwirpClient(session, host, "livekit", failover=failover) self.api_key = api_key self.api_secret = api_secret diff --git a/livekit-api/livekit/api/access_token.py b/livekit-api/livekit/api/access_token.py index 86766c3a..e6516819 100644 --- a/livekit-api/livekit/api/access_token.py +++ b/livekit-api/livekit/api/access_token.py @@ -271,9 +271,10 @@ def verify(self, token: str, *, verify_signature: bool = True) -> Claims: if claims.get("roomPreset"): grant_claims.room_preset = claims.get("roomPreset") - if claims.get("roomConfig"): + room_config = claims.get("roomConfig") + if room_config: grant_claims.room_config = ParseDict( - claims.get("roomConfig"), + room_config, RoomConfiguration(), ignore_unknown_fields=True, ) diff --git a/livekit-api/livekit/api/agent_dispatch_service.py b/livekit-api/livekit/api/agent_dispatch_service.py index 375cc527..d014e6d0 100644 --- a/livekit-api/livekit/api/agent_dispatch_service.py +++ b/livekit-api/livekit/api/agent_dispatch_service.py @@ -26,8 +26,15 @@ class AgentDispatchService(Service): ``` """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: bool = True, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def create_dispatch(self, req: CreateAgentDispatchRequest) -> AgentDispatch: """Create an explicit dispatch for an agent to join a room. diff --git a/livekit-api/livekit/api/connector_service.py b/livekit-api/livekit/api/connector_service.py index 2d686e89..d5eaf2a1 100644 --- a/livekit-api/livekit/api/connector_service.py +++ b/livekit-api/livekit/api/connector_service.py @@ -35,8 +35,15 @@ class ConnectorService(Service): ``` """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: bool = True, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def dial_whatsapp_call( self, request: DialWhatsAppCallRequest diff --git a/livekit-api/livekit/api/egress_service.py b/livekit-api/livekit/api/egress_service.py index 491bff3b..12f81c3e 100644 --- a/livekit-api/livekit/api/egress_service.py +++ b/livekit-api/livekit/api/egress_service.py @@ -33,8 +33,15 @@ class EgressService(Service): Also see https://docs.livekit.io/home/egress/overview/ """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: bool = True, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def start_room_composite_egress(self, start: RoomCompositeEgressRequest) -> EgressInfo: """Starts a composite recording of a room.""" diff --git a/livekit-api/livekit/api/ingress_service.py b/livekit-api/livekit/api/ingress_service.py index 296dabbe..b29ce460 100644 --- a/livekit-api/livekit/api/ingress_service.py +++ b/livekit-api/livekit/api/ingress_service.py @@ -28,8 +28,15 @@ class IngressService(Service): Also see https://docs.livekit.io/home/ingress/overview/ """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: bool = True, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo: return await self._client.request( diff --git a/livekit-api/livekit/api/livekit_api.py b/livekit-api/livekit/api/livekit_api.py index 16d26831..bc0fcfaf 100644 --- a/livekit-api/livekit/api/livekit_api.py +++ b/livekit-api/livekit/api/livekit_api.py @@ -31,6 +31,7 @@ def __init__( *, timeout: Optional[aiohttp.ClientTimeout] = None, session: Optional[aiohttp.ClientSession] = None, + failover: bool = True, ): """Create a new LiveKitAPI instance. @@ -59,12 +60,14 @@ def __init__( timeout = aiohttp.ClientTimeout(total=60) self._session = aiohttp.ClientSession(timeout=timeout) - self._room = RoomService(self._session, url, api_key, api_secret) - self._ingress = IngressService(self._session, url, api_key, api_secret) - self._egress = EgressService(self._session, url, api_key, api_secret) - self._sip = SipService(self._session, url, api_key, api_secret) - self._agent_dispatch = AgentDispatchService(self._session, url, api_key, api_secret) - self._connector = ConnectorService(self._session, url, api_key, api_secret) + self._room = RoomService(self._session, url, api_key, api_secret, failover) + self._ingress = IngressService(self._session, url, api_key, api_secret, failover) + self._egress = EgressService(self._session, url, api_key, api_secret, failover) + self._sip = SipService(self._session, url, api_key, api_secret, failover) + self._agent_dispatch = AgentDispatchService( + self._session, url, api_key, api_secret, failover + ) + self._connector = ConnectorService(self._session, url, api_key, api_secret, failover) @property def agent_dispatch(self) -> AgentDispatchService: diff --git a/livekit-api/livekit/api/room_service.py b/livekit-api/livekit/api/room_service.py index 5c0e3b76..e3cb7bb6 100644 --- a/livekit-api/livekit/api/room_service.py +++ b/livekit-api/livekit/api/room_service.py @@ -45,8 +45,15 @@ class RoomService(Service): Also see https://docs.livekit.io/home/server/managing-rooms/ and https://docs.livekit.io/home/server/managing-participants/ """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: bool = True, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def create_room( self, diff --git a/livekit-api/livekit/api/sip_service.py b/livekit-api/livekit/api/sip_service.py index 0364c797..36d215f3 100644 --- a/livekit-api/livekit/api/sip_service.py +++ b/livekit-api/livekit/api/sip_service.py @@ -53,8 +53,15 @@ class SipService(Service): ``` """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: bool = True, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def create_inbound_trunk( self, create: CreateSIPInboundTrunkRequest diff --git a/livekit-api/livekit/api/twirp_client.py b/livekit-api/livekit/api/twirp_client.py index ce828c3f..1c8d9966 100644 --- a/livekit-api/livekit/api/twirp_client.py +++ b/livekit-api/livekit/api/twirp_client.py @@ -12,14 +12,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, Type, TypeVar, Optional +import asyncio +import logging +from typing import Dict, List, Optional, Type, TypeVar import aiohttp from google.protobuf.message import Message from urllib.parse import urlparse +from ._failover import ( + FAILOVER_BACKOFF_BASE, + RegionCache, + failover_attempts, + host_key, + origin_of, + pick_next, +) + DEFAULT_PREFIX = "twirp" +logger = logging.getLogger("livekit") + +# Shared across all clients in the process so the region list is fetched once. +_REGION_CACHE = RegionCache() + class TwirpError(Exception): def __init__( @@ -92,6 +108,10 @@ def __init__( host: str, pkg: str, prefix: str = DEFAULT_PREFIX, + failover: bool = True, + *, + _failover_force: bool = False, + _failover_backoff: float = FAILOVER_BACKOFF_BASE, ) -> None: parse_res = urlparse(host) scheme = parse_res.scheme @@ -103,6 +123,11 @@ def __init__( self.pkg = pkg self.prefix = prefix self._session = session + # _failover_force / _failover_backoff are internal test-only knobs. + self._failover = failover + self._failover_force = _failover_force + self._failover_backoff = _failover_backoff + self._origin = origin_of(self.host) async def request( self, @@ -114,21 +139,80 @@ async def request( *, timeout: Optional[aiohttp.ClientTimeout] = None, ) -> T: - url = f"{self.host}/{self.prefix}/{self.pkg}.{service}/{method}" + """Issues a Twirp request, failing over to alternative regions on + retryable errors. On any transport error or HTTP 5xx it discovers + regions via ``/settings/regions`` and replays the request — body and + headers intact — against the next untried region, with exponential + backoff. A 4xx is returned immediately.""" + path = f"{self.prefix}/{self.pkg}.{service}/{method}" + forward_headers = dict(headers) # for the discovery fetch (no content-type yet) + headers = dict(headers) headers["Content-Type"] = "application/protobuf" - serialized_data = data.SerializeToString() - async with self._session.post( - url, headers=headers, data=serialized_data, timeout=timeout - ) as resp: - if resp.status == 200: - return response_class.FromString(await resp.read()) - else: - # when we have an error, Twirp always encode it in json - error_data = await resp.json() - raise TwirpError( - error_data.get("code", "unknown"), - error_data.get("msg", ""), - status=resp.status, - metadata=error_data.get("meta"), - ) + + host = urlparse(self._origin).hostname + max_attempts = failover_attempts(self._failover, host, self._failover_force) + attempted = {host_key(self._origin)} + region_origins: Optional[List[str]] = None + current_origin = self._origin + + for attempt in range(max_attempts): + is_last = attempt + 1 >= max_attempts + url = f"{current_origin}/{path}" + + transport_exc: Optional[BaseException] = None + retryable_status: Optional[int] = None + error_data: Dict = {} + try: + async with self._session.post( + url, headers=headers, data=serialized_data, timeout=timeout + ) as resp: + if resp.status == 200: + return response_class.FromString(await resp.read()) + # Twirp encodes errors as JSON regardless of content type. + try: + error_data = await resp.json() + except Exception: + error_data = {} + if resp.status < 500: + # 4xx is terminal. + raise self._twirp_error(error_data, resp.status) + retryable_status = resp.status + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + transport_exc = e + + # Only retryable failures (5xx or transport error) reach this point. + next_origin = None + if not is_last: + if region_origins is None: + region_origins = await _REGION_CACHE.region_origins( + self._session, self._origin, forward_headers + ) + next_origin = pick_next(region_origins, attempted) + + if next_origin is None: + if transport_exc is not None: + raise transport_exc + raise self._twirp_error(error_data, retryable_status or 500) + + reason = transport_exc if transport_exc is not None else f"status {retryable_status}" + logger.warning( + "livekit API request to %s failed (%s), retrying with fallback url %s", + current_origin, + reason, + next_origin, + ) + await asyncio.sleep(self._failover_backoff * (2**attempt)) + attempted.add(host_key(next_origin)) + current_origin = next_origin + + raise RuntimeError("failover loop exited without returning") # unreachable + + @staticmethod + def _twirp_error(error_data: Dict, status: int) -> "TwirpError": + return TwirpError( + error_data.get("code", "unknown"), + error_data.get("msg", ""), + status=status, + metadata=error_data.get("meta"), + ) diff --git a/tests/api/test_failover.py b/tests/api/test_failover.py new file mode 100644 index 00000000..ce68e043 --- /dev/null +++ b/tests/api/test_failover.py @@ -0,0 +1,109 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Region failover tests against the shared mock LiveKit API server +(livekit/livekit cmd/test-server). Point them at a running instance with +LK_TEST_SERVER_URL (default http://127.0.0.1:9999); they skip when no server is +reachable. The mock returns Cache-Control: max-age=0, so the region cache never +stores entries and scenarios don't interfere. + +See cmd/test-server/README.md for the X-Lk-Mock-* control protocol. These tests +drive TwirpClient.request() directly because the public service methods do not +expose per-call headers. +""" + +import asyncio +import os +import urllib.request + +import aiohttp +import pytest + +from livekit.api import CreateRoomRequest, Room, TwirpError +from livekit.api.twirp_client import TwirpClient + +BASE = os.getenv("LK_TEST_SERVER_URL", "http://127.0.0.1:9999") + + +def _server_up() -> bool: + try: + with urllib.request.urlopen(f"{BASE}/settings/regions", timeout=1) as r: + return r.status == 200 + except Exception: + return False + + +pytestmark = pytest.mark.skipif( + not _server_up(), reason=f"mock test server not reachable at {BASE}" +) + + +# _failover_force bypasses the cloud-host check (the mock is on 127.0.0.1) and a +# tiny backoff keeps the tests fast — both are internal, test-only knobs. +async def _call(directives: dict, *, failover: bool = True, force: bool = True) -> Room: + async with aiohttp.ClientSession() as session: + client = TwirpClient( + session, + BASE, + "livekit", + failover=failover, + _failover_force=force, + _failover_backoff=0.001, + ) + headers = {"authorization": "Bearer test-token", **directives} + return await client.request("RoomService", "CreateRoom", CreateRoomRequest(), headers, Room) + + +def test_healthy(): + asyncio.run(_call({})) + + +def test_primary_unavailable(): + asyncio.run(_call({"x-lk-mock-fail-regions": "0"})) + + +def test_two_regions_unavailable(): + asyncio.run(_call({"x-lk-mock-fail-regions": "0,1"})) + + +def test_all_unavailable(): + with pytest.raises(TwirpError): + asyncio.run(_call({"x-lk-mock-fail-regions": "0,1,2,3"})) + + +def test_client_error_not_retried(): + with pytest.raises(TwirpError) as exc: + asyncio.run(_call({"x-lk-mock-fail-regions": "0", "x-lk-mock-fail-status": "400"})) + assert exc.value.code == "invalid_argument" + + +def test_transport_error_failover(): + asyncio.run(_call({"x-lk-mock-fail-regions": "0", "x-lk-mock-fail-mode": "drop"})) + + +def test_region_discovery_unreachable(): + with pytest.raises(TwirpError): + asyncio.run(_call({"x-lk-mock-fail-regions": "0", "x-lk-mock-regions-status": "500"})) + + +def test_not_cloud_host(): + # Enabled but not forced; 127.0.0.1 is not a cloud host, so no failover. + with pytest.raises(TwirpError): + asyncio.run(_call({"x-lk-mock-fail-regions": "0"}, force=False)) + + +def test_disabled(): + # failover=False disables failover entirely. + with pytest.raises(TwirpError): + asyncio.run(_call({"x-lk-mock-fail-regions": "0"}, failover=False)) diff --git a/uv.lock b/uv.lock index 1efa8869..e912b879 100644 --- a/uv.lock +++ b/uv.lock @@ -1869,8 +1869,8 @@ dependencies = [ requires-dist = [ { name = "aiofiles", specifier = ">=24" }, { name = "numpy", specifier = ">=1.26" }, - { name = "protobuf", specifier = ">=4.25.0" }, - { name = "types-protobuf", specifier = ">=3" }, + { name = "protobuf", specifier = ">=5" }, + { name = "types-protobuf", specifier = ">=5" }, ] [[package]] @@ -1908,8 +1908,8 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "protobuf", specifier = ">=4" }, - { name = "types-protobuf", specifier = ">=4" }, + { name = "protobuf", specifier = ">=5" }, + { name = "types-protobuf", specifier = ">=5" }, ] [[package]]