From d2669883687a09096f5f0af718b9151373d8e132 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 28 Jun 2026 00:28:33 +0200 Subject: [PATCH 1/5] feat: auto failover APIs with LK Cloud retries in alternative datacenters on 5xx and transport failures --- .github/workflows/test-api.yml | 67 +++++++ livekit-api/livekit/api/__init__.py | 2 + livekit-api/livekit/api/_failover.py | 165 ++++++++++++++++++ livekit-api/livekit/api/_service.py | 13 +- .../livekit/api/agent_dispatch_service.py | 12 +- livekit-api/livekit/api/connector_service.py | 13 +- livekit-api/livekit/api/egress_service.py | 13 +- livekit-api/livekit/api/ingress_service.py | 13 +- livekit-api/livekit/api/livekit_api.py | 16 +- livekit-api/livekit/api/room_service.py | 13 +- livekit-api/livekit/api/sip_service.py | 12 +- livekit-api/livekit/api/twirp_client.py | 103 +++++++++-- tests/api/test_failover.py | 106 +++++++++++ uv.lock | 8 +- 14 files changed, 515 insertions(+), 41 deletions(-) create mode 100644 .github/workflows/test-api.yml create mode 100644 livekit-api/livekit/api/_failover.py create mode 100644 tests/api/test_failover.py diff --git a/.github/workflows/test-api.yml b/.github/workflows/test-api.yml new file mode 100644 index 00000000..bb27ceaa --- /dev/null +++ b/.github/workflows/test-api.yml @@ -0,0 +1,67 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Test API + +permissions: + contents: read + +on: + workflow_dispatch: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + failover: + runs-on: ubuntu-latest + services: + mock-server: + image: livekit/test-server:latest + ports: + - 9999:9999 + - 10000:10000 + - 10001:10001 + - 10002:10002 + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + with: + submodules: true + + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 + with: + python-version: "3.12" + + - name: Install uv + uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0 + + - name: Install livekit-api (without livekit-rtc) + run: | + uv venv .failover-venv + source .failover-venv/bin/activate + uv pip install ./livekit-protocol ./livekit-api pytest aiohttp + + - name: Wait for mock server + run: | + for i in $(seq 1 30); do + curl -sf http://127.0.0.1:9999/settings/regions >/dev/null && exit 0 + sleep 1 + done + echo "mock server did not become ready" && exit 1 + + - name: Run API tests + run: | + source .failover-venv/bin/activate + pytest tests/api -v diff --git a/livekit-api/livekit/api/__init__.py b/livekit-api/livekit/api/__init__.py index fb115f6f..040853fd 100644 --- a/livekit-api/livekit/api/__init__.py +++ b/livekit-api/livekit/api/__init__.py @@ -38,6 +38,7 @@ from .twirp_client import TwirpError, TwirpErrorCode from .livekit_api import LiveKitAPI +from ._failover import FailoverConfig from .access_token import ( InferenceGrants, ObservabilityGrants, @@ -66,4 +67,5 @@ "WebhookReceiver", "TwirpError", "TwirpErrorCode", + "FailoverConfig", ] diff --git a/livekit-api/livekit/api/_failover.py b/livekit-api/livekit/api/_failover.py new file mode 100644 index 00000000..553142d2 --- /dev/null +++ b/livekit-api/livekit/api/_failover.py @@ -0,0 +1,165 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Region failover for the Twirp API clients. + +On a retryable failure (any transport error or HTTP 5xx) the client discovers +alternative LiveKit Cloud regions via ``/settings/regions`` and replays the +request against the next region, with exponential backoff. 4xx responses are +returned immediately. +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass +from typing import Dict, List, Optional, TypedDict +from urllib.parse import urlparse + +import aiohttp + +_DEFAULT_MAX_ATTEMPTS = 3 +_DEFAULT_BACKOFF_BASE = 0.2 + + +class FailoverConfig(TypedDict, total=False): + """Region-failover tuning, passed as the ``failover`` argument. Use it as a + plain dict, e.g. ``failover={"max_attempts": 5}``. All keys are optional. + + Keys: + max_attempts: total number of attempts including the initial request — + the original host plus up to ``max_attempts - 1`` fallback regions. + Defaults to 3. Set to 1 to disable failover (a single attempt). + backoff_base: seconds before the first retry; each subsequent retry + doubles it. Defaults to 0.2. + """ + + max_attempts: int + backoff_base: float + + +def failover_attempts(failover: Optional[FailoverConfig], host: Optional[str]) -> int: + """Total request attempts including the initial one; 1 means no failover. + + With no config (``None``) failover is enabled only for LiveKit Cloud hosts. + An explicit config enables it for any host; ``max_attempts=1`` disables it. + """ + if failover is None: + return _DEFAULT_MAX_ATTEMPTS if (bool(host) and is_cloud(host)) else 1 # type: ignore[arg-type] + return max(1, failover.get("max_attempts", _DEFAULT_MAX_ATTEMPTS)) + + +def failover_backoff_base(failover: Optional[FailoverConfig]) -> float: + return (failover or {}).get("backoff_base", _DEFAULT_BACKOFF_BASE) + + +def is_cloud(host: str) -> bool: + # Auto mode only enables failover for LiveKit Cloud project domains. + return host.endswith(".livekit.cloud") + + +def to_http(url: str) -> str: + """Normalizes a region URL to an http(s) scheme (ws -> http, wss -> https).""" + if url.startswith("ws"): + return "http" + url[2:] + return url + + +def origin_of(url: str) -> str: + """Returns the scheme://host[:port] origin of a URL, dropping any path.""" + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + +def host_key(url: str) -> str: + """A stable key identifying a host (including port) for dedup across attempts.""" + return urlparse(url).netloc.lower() + + +def pick_next(region_origins: List[str], attempted: set[str]) -> Optional[str]: + """Returns the first region origin whose host has not yet been attempted.""" + for origin in region_origins: + if host_key(origin) not in attempted: + return origin + return None + + +@dataclass +class _CacheEntry: + origins: List[str] + fetched_at: float + ttl: float + + +class RegionCache: + """Process-wide cache of the LiveKit Cloud region list, keyed by host.""" + + def __init__(self) -> None: + self._entries: Dict[str, _CacheEntry] = {} + + async def region_origins( + self, + session: aiohttp.ClientSession, + origin: str, + headers: Dict[str, str], + ) -> List[str]: + """Returns alternative region origins for ``origin``, fetching + ``/settings/regions`` if the cache is stale. Best-effort: on a fetch + failure it serves a stale cached list when available, otherwise an empty + list. Forwards ``headers`` so a valid token — and any test directives — + reach the discovery endpoint.""" + key = host_key(origin) + entry = self._entries.get(key) + if entry is not None and (time.monotonic() - entry.fetched_at) < entry.ttl: + return entry.origins + + try: + origins, ttl = await self._fetch(session, origin, headers) + except Exception: + return entry.origins if entry is not None else [] + + # A zero TTL (e.g. Cache-Control: max-age=0) means "do not cache". + if ttl > 0: + self._entries[key] = _CacheEntry(origins, time.monotonic(), ttl) + return origins + + async def _fetch( + self, + session: aiohttp.ClientSession, + origin: str, + headers: Dict[str, str], + ) -> tuple[List[str], float]: + fetch_headers = { + k: v for k, v in headers.items() if k.lower() not in ("content-type", "content-length") + } + async with session.get(f"{origin}/settings/regions", headers=fetch_headers) as resp: + if resp.status != 200: + raise RuntimeError(f"region discovery failed: {resp.status}") + ttl = _parse_max_age(resp.headers.get("Cache-Control")) + body = await resp.json() + origins = [origin_of(to_http(r["url"])) for r in body.get("regions", []) if r.get("url")] + return origins, ttl + + +def _parse_max_age(cache_control: Optional[str]) -> float: + if not cache_control: + return 0.0 + for directive in cache_control.split(","): + directive = directive.strip().lower() + if directive.startswith("max-age="): + try: + return float(int(directive[len("max-age=") :])) + except ValueError: + return 0.0 + return 0.0 diff --git a/livekit-api/livekit/api/_service.py b/livekit-api/livekit/api/_service.py index 7777f6ee..8d2dc3ac 100644 --- a/livekit-api/livekit/api/_service.py +++ b/livekit-api/livekit/api/_service.py @@ -3,14 +3,23 @@ import aiohttp from abc import ABC from .twirp_client import TwirpClient +from ._failover import FailoverConfig +from typing import Optional from .access_token import AccessToken, VideoGrants, SIPGrants AUTHORIZATION = "authorization" class Service(ABC): - def __init__(self, session: aiohttp.ClientSession, host: str, api_key: str, api_secret: str): - self._client = TwirpClient(session, host, "livekit") + def __init__( + self, + session: aiohttp.ClientSession, + host: str, + api_key: str, + api_secret: str, + failover: Optional[FailoverConfig] = None, + ): + self._client = TwirpClient(session, host, "livekit", failover=failover) self.api_key = api_key self.api_secret = api_secret diff --git a/livekit-api/livekit/api/agent_dispatch_service.py b/livekit-api/livekit/api/agent_dispatch_service.py index 375cc527..05843aec 100644 --- a/livekit-api/livekit/api/agent_dispatch_service.py +++ b/livekit-api/livekit/api/agent_dispatch_service.py @@ -8,6 +8,7 @@ ListAgentDispatchResponse, ) from ._service import Service +from ._failover import FailoverConfig from .access_token import VideoGrants SVC = "AgentDispatchService" @@ -26,8 +27,15 @@ class AgentDispatchService(Service): ``` """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: Optional[FailoverConfig] = None, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def create_dispatch(self, req: CreateAgentDispatchRequest) -> AgentDispatch: """Create an explicit dispatch for an agent to join a room. diff --git a/livekit-api/livekit/api/connector_service.py b/livekit-api/livekit/api/connector_service.py index 2d686e89..1f2f627c 100644 --- a/livekit-api/livekit/api/connector_service.py +++ b/livekit-api/livekit/api/connector_service.py @@ -17,6 +17,8 @@ ConnectTwilioCallResponse, ) from ._service import Service +from ._failover import FailoverConfig +from typing import Optional from .access_token import VideoGrants SVC = "Connector" @@ -35,8 +37,15 @@ class ConnectorService(Service): ``` """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: Optional[FailoverConfig] = None, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def dial_whatsapp_call( self, request: DialWhatsAppCallRequest diff --git a/livekit-api/livekit/api/egress_service.py b/livekit-api/livekit/api/egress_service.py index 491bff3b..c8e67abe 100644 --- a/livekit-api/livekit/api/egress_service.py +++ b/livekit-api/livekit/api/egress_service.py @@ -13,6 +13,8 @@ ListEgressResponse, ) from ._service import Service +from ._failover import FailoverConfig +from typing import Optional from .access_token import VideoGrants SVC = "Egress" @@ -33,8 +35,15 @@ class EgressService(Service): Also see https://docs.livekit.io/home/egress/overview/ """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: Optional[FailoverConfig] = None, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def start_room_composite_egress(self, start: RoomCompositeEgressRequest) -> EgressInfo: """Starts a composite recording of a room.""" diff --git a/livekit-api/livekit/api/ingress_service.py b/livekit-api/livekit/api/ingress_service.py index 296dabbe..e53b85fe 100644 --- a/livekit-api/livekit/api/ingress_service.py +++ b/livekit-api/livekit/api/ingress_service.py @@ -8,6 +8,8 @@ ListIngressResponse, ) from ._service import Service +from ._failover import FailoverConfig +from typing import Optional from .access_token import VideoGrants SVC = "Ingress" @@ -28,8 +30,15 @@ class IngressService(Service): Also see https://docs.livekit.io/home/ingress/overview/ """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: Optional[FailoverConfig] = None, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo: return await self._client.request( diff --git a/livekit-api/livekit/api/livekit_api.py b/livekit-api/livekit/api/livekit_api.py index 16d26831..a96ce4f4 100644 --- a/livekit-api/livekit/api/livekit_api.py +++ b/livekit-api/livekit/api/livekit_api.py @@ -6,6 +6,7 @@ from .sip_service import SipService from .agent_dispatch_service import AgentDispatchService from .connector_service import ConnectorService +from ._failover import FailoverConfig from typing import Any, Optional @@ -31,6 +32,7 @@ def __init__( *, timeout: Optional[aiohttp.ClientTimeout] = None, session: Optional[aiohttp.ClientSession] = None, + failover: Optional[FailoverConfig] = None, ): """Create a new LiveKitAPI instance. @@ -59,12 +61,14 @@ def __init__( timeout = aiohttp.ClientTimeout(total=60) self._session = aiohttp.ClientSession(timeout=timeout) - self._room = RoomService(self._session, url, api_key, api_secret) - self._ingress = IngressService(self._session, url, api_key, api_secret) - self._egress = EgressService(self._session, url, api_key, api_secret) - self._sip = SipService(self._session, url, api_key, api_secret) - self._agent_dispatch = AgentDispatchService(self._session, url, api_key, api_secret) - self._connector = ConnectorService(self._session, url, api_key, api_secret) + self._room = RoomService(self._session, url, api_key, api_secret, failover) + self._ingress = IngressService(self._session, url, api_key, api_secret, failover) + self._egress = EgressService(self._session, url, api_key, api_secret, failover) + self._sip = SipService(self._session, url, api_key, api_secret, failover) + self._agent_dispatch = AgentDispatchService( + self._session, url, api_key, api_secret, failover + ) + self._connector = ConnectorService(self._session, url, api_key, api_secret, failover) @property def agent_dispatch(self) -> AgentDispatchService: diff --git a/livekit-api/livekit/api/room_service.py b/livekit-api/livekit/api/room_service.py index 5c0e3b76..41084af8 100644 --- a/livekit-api/livekit/api/room_service.py +++ b/livekit-api/livekit/api/room_service.py @@ -25,6 +25,8 @@ ) from livekit.protocol.models import Room, ParticipantInfo from ._service import Service +from ._failover import FailoverConfig +from typing import Optional from .access_token import VideoGrants SVC = "RoomService" @@ -45,8 +47,15 @@ class RoomService(Service): Also see https://docs.livekit.io/home/server/managing-rooms/ and https://docs.livekit.io/home/server/managing-participants/ """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: Optional[FailoverConfig] = None, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def create_room( self, diff --git a/livekit-api/livekit/api/sip_service.py b/livekit-api/livekit/api/sip_service.py index 0364c797..949508be 100644 --- a/livekit-api/livekit/api/sip_service.py +++ b/livekit-api/livekit/api/sip_service.py @@ -35,6 +35,7 @@ SIPTransport, ) from ._service import Service +from ._failover import FailoverConfig from .access_token import VideoGrants, SIPGrants SVC = "SIP" @@ -53,8 +54,15 @@ class SipService(Service): ``` """ - def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): - super().__init__(session, url, api_key, api_secret) + def __init__( + self, + session: aiohttp.ClientSession, + url: str, + api_key: str, + api_secret: str, + failover: Optional[FailoverConfig] = None, + ): + super().__init__(session, url, api_key, api_secret, failover=failover) async def create_inbound_trunk( self, create: CreateSIPInboundTrunkRequest diff --git a/livekit-api/livekit/api/twirp_client.py b/livekit-api/livekit/api/twirp_client.py index ce828c3f..5b440c5e 100644 --- a/livekit-api/livekit/api/twirp_client.py +++ b/livekit-api/livekit/api/twirp_client.py @@ -12,14 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, Type, TypeVar, Optional +import asyncio +from typing import Dict, List, Optional, Type, TypeVar import aiohttp from google.protobuf.message import Message from urllib.parse import urlparse +from ._failover import ( + FailoverConfig, + RegionCache, + failover_attempts, + failover_backoff_base, + host_key, + origin_of, + pick_next, +) + DEFAULT_PREFIX = "twirp" +# Shared across all clients in the process so the region list is fetched once. +_REGION_CACHE = RegionCache() + class TwirpError(Exception): def __init__( @@ -92,6 +106,7 @@ def __init__( host: str, pkg: str, prefix: str = DEFAULT_PREFIX, + failover: Optional[FailoverConfig] = None, ) -> None: parse_res = urlparse(host) scheme = parse_res.scheme @@ -103,6 +118,8 @@ def __init__( self.pkg = pkg self.prefix = prefix self._session = session + self._failover: Optional[FailoverConfig] = failover + self._origin = origin_of(self.host) async def request( self, @@ -114,21 +131,73 @@ async def request( *, timeout: Optional[aiohttp.ClientTimeout] = None, ) -> T: - url = f"{self.host}/{self.prefix}/{self.pkg}.{service}/{method}" + """Issues a Twirp request, failing over to alternative regions on + retryable errors. On any transport error or HTTP 5xx it discovers + regions via ``/settings/regions`` and replays the request — body and + headers intact — against the next untried region, with exponential + backoff. A 4xx is returned immediately.""" + path = f"{self.prefix}/{self.pkg}.{service}/{method}" + forward_headers = dict(headers) # for the discovery fetch (no content-type yet) + headers = dict(headers) headers["Content-Type"] = "application/protobuf" - serialized_data = data.SerializeToString() - async with self._session.post( - url, headers=headers, data=serialized_data, timeout=timeout - ) as resp: - if resp.status == 200: - return response_class.FromString(await resp.read()) - else: - # when we have an error, Twirp always encode it in json - error_data = await resp.json() - raise TwirpError( - error_data.get("code", "unknown"), - error_data.get("msg", ""), - status=resp.status, - metadata=error_data.get("meta"), - ) + + host = urlparse(self._origin).hostname + max_attempts = failover_attempts(self._failover, host) + attempted = {host_key(self._origin)} + region_origins: Optional[List[str]] = None + current_origin = self._origin + + for attempt in range(max_attempts): + is_last = attempt + 1 >= max_attempts + url = f"{current_origin}/{path}" + + transport_exc: Optional[BaseException] = None + retryable_status: Optional[int] = None + error_data: Dict = {} + try: + async with self._session.post( + url, headers=headers, data=serialized_data, timeout=timeout + ) as resp: + if resp.status == 200: + return response_class.FromString(await resp.read()) + # Twirp encodes errors as JSON regardless of content type. + try: + error_data = await resp.json() + except Exception: + error_data = {} + if resp.status < 500: + # 4xx is terminal. + raise self._twirp_error(error_data, resp.status) + retryable_status = resp.status + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + transport_exc = e + + # Only retryable failures (5xx or transport error) reach this point. + next_origin = None + if not is_last: + if region_origins is None: + region_origins = await _REGION_CACHE.region_origins( + self._session, self._origin, forward_headers + ) + next_origin = pick_next(region_origins, attempted) + + if next_origin is None: + if transport_exc is not None: + raise transport_exc + raise self._twirp_error(error_data, retryable_status or 500) + + await asyncio.sleep(failover_backoff_base(self._failover) * (2**attempt)) + attempted.add(host_key(next_origin)) + current_origin = next_origin + + raise RuntimeError("failover loop exited without returning") # unreachable + + @staticmethod + def _twirp_error(error_data: Dict, status: int) -> "TwirpError": + return TwirpError( + error_data.get("code", "unknown"), + error_data.get("msg", ""), + status=status, + metadata=error_data.get("meta"), + ) diff --git a/tests/api/test_failover.py b/tests/api/test_failover.py new file mode 100644 index 00000000..c304cac0 --- /dev/null +++ b/tests/api/test_failover.py @@ -0,0 +1,106 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Region failover tests against the shared mock LiveKit API server +(livekit/livekit cmd/test-server). Point them at a running instance with +LK_TEST_SERVER_URL (default http://127.0.0.1:9999); they skip when no server is +reachable. The mock returns Cache-Control: max-age=0, so the region cache never +stores entries and scenarios don't interfere. + +See cmd/test-server/README.md for the X-Lk-Mock-* control protocol. These tests +drive TwirpClient.request() directly because the public service methods do not +expose per-call headers. +""" + +import asyncio +import os +import urllib.request + +import aiohttp +import pytest + +from typing import Optional + +from livekit.api import CreateRoomRequest, FailoverConfig, Room, TwirpError +from livekit.api.twirp_client import TwirpClient + +BASE = os.getenv("LK_TEST_SERVER_URL", "http://127.0.0.1:9999") + +# An explicit config enables failover on any host (the non-cloud mock) with a +# tiny backoff so the tests run fast. +FORCED: FailoverConfig = {"max_attempts": 3, "backoff_base": 0.001} + + +def _server_up() -> bool: + try: + with urllib.request.urlopen(f"{BASE}/settings/regions", timeout=1) as r: + return r.status == 200 + except Exception: + return False + + +pytestmark = pytest.mark.skipif( + not _server_up(), reason=f"mock test server not reachable at {BASE}" +) + + +async def _call(directives: dict, failover: Optional[FailoverConfig] = FORCED) -> Room: + async with aiohttp.ClientSession() as session: + client = TwirpClient(session, BASE, "livekit", failover=failover) + headers = {"authorization": "Bearer test-token", **directives} + return await client.request("RoomService", "CreateRoom", CreateRoomRequest(), headers, Room) + + +def test_healthy(): + asyncio.run(_call({})) + + +def test_primary_unavailable(): + asyncio.run(_call({"x-lk-mock-fail-regions": "0"})) + + +def test_two_regions_unavailable(): + asyncio.run(_call({"x-lk-mock-fail-regions": "0,1"})) + + +def test_all_unavailable(): + with pytest.raises(TwirpError): + asyncio.run(_call({"x-lk-mock-fail-regions": "0,1,2,3"})) + + +def test_client_error_not_retried(): + with pytest.raises(TwirpError) as exc: + asyncio.run(_call({"x-lk-mock-fail-regions": "0", "x-lk-mock-fail-status": "400"})) + assert exc.value.code == "invalid_argument" + + +def test_transport_error_failover(): + asyncio.run(_call({"x-lk-mock-fail-regions": "0", "x-lk-mock-fail-mode": "drop"})) + + +def test_region_discovery_unreachable(): + with pytest.raises(TwirpError): + asyncio.run(_call({"x-lk-mock-fail-regions": "0", "x-lk-mock-regions-status": "500"})) + + +def test_disabled_for_non_cloud(): + # Default (None) against 127.0.0.1 (non-cloud) must not fail over. + with pytest.raises(TwirpError): + asyncio.run(_call({"x-lk-mock-fail-regions": "0"}, failover=None)) + + +def test_explicitly_disabled(): + # max_attempts=1 (a single attempt) disables failover even with a config. + with pytest.raises(TwirpError): + asyncio.run(_call({"x-lk-mock-fail-regions": "0"}, failover={"max_attempts": 1})) diff --git a/uv.lock b/uv.lock index 1efa8869..e912b879 100644 --- a/uv.lock +++ b/uv.lock @@ -1869,8 +1869,8 @@ dependencies = [ requires-dist = [ { name = "aiofiles", specifier = ">=24" }, { name = "numpy", specifier = ">=1.26" }, - { name = "protobuf", specifier = ">=4.25.0" }, - { name = "types-protobuf", specifier = ">=3" }, + { name = "protobuf", specifier = ">=5" }, + { name = "types-protobuf", specifier = ">=5" }, ] [[package]] @@ -1908,8 +1908,8 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "protobuf", specifier = ">=4" }, - { name = "types-protobuf", specifier = ">=4" }, + { name = "protobuf", specifier = ">=5" }, + { name = "types-protobuf", specifier = ">=5" }, ] [[package]] From 13a09dd9d9c05597f83dafda1c750947879e923f Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 28 Jun 2026 09:12:39 +0200 Subject: [PATCH 2/5] tweaks --- livekit-api/livekit/api/__init__.py | 4 ++-- livekit-api/livekit/api/_failover.py | 14 ++++++++++---- livekit-api/livekit/api/_service.py | 4 ++-- .../livekit/api/agent_dispatch_service.py | 4 ++-- livekit-api/livekit/api/connector_service.py | 4 ++-- livekit-api/livekit/api/egress_service.py | 4 ++-- livekit-api/livekit/api/ingress_service.py | 4 ++-- livekit-api/livekit/api/livekit_api.py | 4 ++-- livekit-api/livekit/api/room_service.py | 4 ++-- livekit-api/livekit/api/sip_service.py | 4 ++-- livekit-api/livekit/api/twirp_client.py | 16 +++++++++++++--- tests/api/test_failover.py | 6 +++--- 12 files changed, 44 insertions(+), 28 deletions(-) diff --git a/livekit-api/livekit/api/__init__.py b/livekit-api/livekit/api/__init__.py index 040853fd..04f6e345 100644 --- a/livekit-api/livekit/api/__init__.py +++ b/livekit-api/livekit/api/__init__.py @@ -38,7 +38,7 @@ from .twirp_client import TwirpError, TwirpErrorCode from .livekit_api import LiveKitAPI -from ._failover import FailoverConfig +from ._failover import FailoverOptions from .access_token import ( InferenceGrants, ObservabilityGrants, @@ -67,5 +67,5 @@ "WebhookReceiver", "TwirpError", "TwirpErrorCode", - "FailoverConfig", + "FailoverOptions", ] diff --git a/livekit-api/livekit/api/_failover.py b/livekit-api/livekit/api/_failover.py index 553142d2..aeff965b 100644 --- a/livekit-api/livekit/api/_failover.py +++ b/livekit-api/livekit/api/_failover.py @@ -33,7 +33,7 @@ _DEFAULT_BACKOFF_BASE = 0.2 -class FailoverConfig(TypedDict, total=False): +class FailoverOptions(TypedDict, total=False): """Region-failover tuning, passed as the ``failover`` argument. Use it as a plain dict, e.g. ``failover={"max_attempts": 5}``. All keys are optional. @@ -49,7 +49,7 @@ class FailoverConfig(TypedDict, total=False): backoff_base: float -def failover_attempts(failover: Optional[FailoverConfig], host: Optional[str]) -> int: +def failover_attempts(failover: Optional[FailoverOptions], host: Optional[str]) -> int: """Total request attempts including the initial one; 1 means no failover. With no config (``None``) failover is enabled only for LiveKit Cloud hosts. @@ -60,7 +60,7 @@ def failover_attempts(failover: Optional[FailoverConfig], host: Optional[str]) - return max(1, failover.get("max_attempts", _DEFAULT_MAX_ATTEMPTS)) -def failover_backoff_base(failover: Optional[FailoverConfig]) -> float: +def failover_backoff_base(failover: Optional[FailoverOptions]) -> float: return (failover or {}).get("backoff_base", _DEFAULT_BACKOFF_BASE) @@ -143,7 +143,13 @@ async def _fetch( fetch_headers = { k: v for k, v in headers.items() if k.lower() not in ("content-type", "content-length") } - async with session.get(f"{origin}/settings/regions", headers=fetch_headers) as resp: + # Short timeout so a slow/unreachable discovery endpoint doesn't stall + # the failover path. + async with session.get( + f"{origin}/settings/regions", + headers=fetch_headers, + timeout=aiohttp.ClientTimeout(total=2), + ) as resp: if resp.status != 200: raise RuntimeError(f"region discovery failed: {resp.status}") ttl = _parse_max_age(resp.headers.get("Cache-Control")) diff --git a/livekit-api/livekit/api/_service.py b/livekit-api/livekit/api/_service.py index 8d2dc3ac..a4956b95 100644 --- a/livekit-api/livekit/api/_service.py +++ b/livekit-api/livekit/api/_service.py @@ -3,7 +3,7 @@ import aiohttp from abc import ABC from .twirp_client import TwirpClient -from ._failover import FailoverConfig +from ._failover import FailoverOptions from typing import Optional from .access_token import AccessToken, VideoGrants, SIPGrants @@ -17,7 +17,7 @@ def __init__( host: str, api_key: str, api_secret: str, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ): self._client = TwirpClient(session, host, "livekit", failover=failover) self.api_key = api_key diff --git a/livekit-api/livekit/api/agent_dispatch_service.py b/livekit-api/livekit/api/agent_dispatch_service.py index 05843aec..1704f6e4 100644 --- a/livekit-api/livekit/api/agent_dispatch_service.py +++ b/livekit-api/livekit/api/agent_dispatch_service.py @@ -8,7 +8,7 @@ ListAgentDispatchResponse, ) from ._service import Service -from ._failover import FailoverConfig +from ._failover import FailoverOptions from .access_token import VideoGrants SVC = "AgentDispatchService" @@ -33,7 +33,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/connector_service.py b/livekit-api/livekit/api/connector_service.py index 1f2f627c..1016bd2e 100644 --- a/livekit-api/livekit/api/connector_service.py +++ b/livekit-api/livekit/api/connector_service.py @@ -17,7 +17,7 @@ ConnectTwilioCallResponse, ) from ._service import Service -from ._failover import FailoverConfig +from ._failover import FailoverOptions from typing import Optional from .access_token import VideoGrants @@ -43,7 +43,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/egress_service.py b/livekit-api/livekit/api/egress_service.py index c8e67abe..8c51c5b1 100644 --- a/livekit-api/livekit/api/egress_service.py +++ b/livekit-api/livekit/api/egress_service.py @@ -13,7 +13,7 @@ ListEgressResponse, ) from ._service import Service -from ._failover import FailoverConfig +from ._failover import FailoverOptions from typing import Optional from .access_token import VideoGrants @@ -41,7 +41,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/ingress_service.py b/livekit-api/livekit/api/ingress_service.py index e53b85fe..274dbc1e 100644 --- a/livekit-api/livekit/api/ingress_service.py +++ b/livekit-api/livekit/api/ingress_service.py @@ -8,7 +8,7 @@ ListIngressResponse, ) from ._service import Service -from ._failover import FailoverConfig +from ._failover import FailoverOptions from typing import Optional from .access_token import VideoGrants @@ -36,7 +36,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/livekit_api.py b/livekit-api/livekit/api/livekit_api.py index a96ce4f4..36d4a07b 100644 --- a/livekit-api/livekit/api/livekit_api.py +++ b/livekit-api/livekit/api/livekit_api.py @@ -6,7 +6,7 @@ from .sip_service import SipService from .agent_dispatch_service import AgentDispatchService from .connector_service import ConnectorService -from ._failover import FailoverConfig +from ._failover import FailoverOptions from typing import Any, Optional @@ -32,7 +32,7 @@ def __init__( *, timeout: Optional[aiohttp.ClientTimeout] = None, session: Optional[aiohttp.ClientSession] = None, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ): """Create a new LiveKitAPI instance. diff --git a/livekit-api/livekit/api/room_service.py b/livekit-api/livekit/api/room_service.py index 41084af8..10d8f477 100644 --- a/livekit-api/livekit/api/room_service.py +++ b/livekit-api/livekit/api/room_service.py @@ -25,7 +25,7 @@ ) from livekit.protocol.models import Room, ParticipantInfo from ._service import Service -from ._failover import FailoverConfig +from ._failover import FailoverOptions from typing import Optional from .access_token import VideoGrants @@ -53,7 +53,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/sip_service.py b/livekit-api/livekit/api/sip_service.py index 949508be..29fb97a4 100644 --- a/livekit-api/livekit/api/sip_service.py +++ b/livekit-api/livekit/api/sip_service.py @@ -35,7 +35,7 @@ SIPTransport, ) from ._service import Service -from ._failover import FailoverConfig +from ._failover import FailoverOptions from .access_token import VideoGrants, SIPGrants SVC = "SIP" @@ -60,7 +60,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/twirp_client.py b/livekit-api/livekit/api/twirp_client.py index 5b440c5e..92b80511 100644 --- a/livekit-api/livekit/api/twirp_client.py +++ b/livekit-api/livekit/api/twirp_client.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import logging from typing import Dict, List, Optional, Type, TypeVar import aiohttp @@ -20,7 +21,7 @@ from urllib.parse import urlparse from ._failover import ( - FailoverConfig, + FailoverOptions, RegionCache, failover_attempts, failover_backoff_base, @@ -31,6 +32,8 @@ DEFAULT_PREFIX = "twirp" +logger = logging.getLogger("livekit") + # Shared across all clients in the process so the region list is fetched once. _REGION_CACHE = RegionCache() @@ -106,7 +109,7 @@ def __init__( host: str, pkg: str, prefix: str = DEFAULT_PREFIX, - failover: Optional[FailoverConfig] = None, + failover: Optional[FailoverOptions] = None, ) -> None: parse_res = urlparse(host) scheme = parse_res.scheme @@ -118,7 +121,7 @@ def __init__( self.pkg = pkg self.prefix = prefix self._session = session - self._failover: Optional[FailoverConfig] = failover + self._failover: Optional[FailoverOptions] = failover self._origin = origin_of(self.host) async def request( @@ -187,6 +190,13 @@ async def request( raise transport_exc raise self._twirp_error(error_data, retryable_status or 500) + reason = transport_exc if transport_exc is not None else f"status {retryable_status}" + logger.warning( + "livekit API request to %s failed (%s), retrying with fallback url %s", + current_origin, + reason, + next_origin, + ) await asyncio.sleep(failover_backoff_base(self._failover) * (2**attempt)) attempted.add(host_key(next_origin)) current_origin = next_origin diff --git a/tests/api/test_failover.py b/tests/api/test_failover.py index c304cac0..c8cb0867 100644 --- a/tests/api/test_failover.py +++ b/tests/api/test_failover.py @@ -32,14 +32,14 @@ from typing import Optional -from livekit.api import CreateRoomRequest, FailoverConfig, Room, TwirpError +from livekit.api import CreateRoomRequest, FailoverOptions, Room, TwirpError from livekit.api.twirp_client import TwirpClient BASE = os.getenv("LK_TEST_SERVER_URL", "http://127.0.0.1:9999") # An explicit config enables failover on any host (the non-cloud mock) with a # tiny backoff so the tests run fast. -FORCED: FailoverConfig = {"max_attempts": 3, "backoff_base": 0.001} +FORCED: FailoverOptions = {"max_attempts": 3, "backoff_base": 0.001} def _server_up() -> bool: @@ -55,7 +55,7 @@ def _server_up() -> bool: ) -async def _call(directives: dict, failover: Optional[FailoverConfig] = FORCED) -> Room: +async def _call(directives: dict, failover: Optional[FailoverOptions] = FORCED) -> Room: async with aiohttp.ClientSession() as session: client = TwirpClient(session, BASE, "livekit", failover=failover) headers = {"authorization": "Bearer test-token", **directives} From a1436224e30ac5f9e789df4889872463778d2fad Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 28 Jun 2026 16:41:29 +0200 Subject: [PATCH 3/5] remove options to simplify --- livekit-api/livekit/api/__init__.py | 2 - livekit-api/livekit/api/_failover.py | 43 +++++-------------- livekit-api/livekit/api/_service.py | 4 +- .../livekit/api/agent_dispatch_service.py | 3 +- livekit-api/livekit/api/connector_service.py | 4 +- livekit-api/livekit/api/egress_service.py | 4 +- livekit-api/livekit/api/ingress_service.py | 4 +- livekit-api/livekit/api/livekit_api.py | 3 +- livekit-api/livekit/api/room_service.py | 3 +- livekit-api/livekit/api/sip_service.py | 3 +- livekit-api/livekit/api/twirp_client.py | 17 +++++--- tests/api/test_failover.py | 33 +++++++------- 12 files changed, 48 insertions(+), 75 deletions(-) diff --git a/livekit-api/livekit/api/__init__.py b/livekit-api/livekit/api/__init__.py index 04f6e345..fb115f6f 100644 --- a/livekit-api/livekit/api/__init__.py +++ b/livekit-api/livekit/api/__init__.py @@ -38,7 +38,6 @@ from .twirp_client import TwirpError, TwirpErrorCode from .livekit_api import LiveKitAPI -from ._failover import FailoverOptions from .access_token import ( InferenceGrants, ObservabilityGrants, @@ -67,5 +66,4 @@ "WebhookReceiver", "TwirpError", "TwirpErrorCode", - "FailoverOptions", ] diff --git a/livekit-api/livekit/api/_failover.py b/livekit-api/livekit/api/_failover.py index aeff965b..ce6c089c 100644 --- a/livekit-api/livekit/api/_failover.py +++ b/livekit-api/livekit/api/_failover.py @@ -24,48 +24,27 @@ import time from dataclasses import dataclass -from typing import Dict, List, Optional, TypedDict +from typing import Dict, List, Optional from urllib.parse import urlparse import aiohttp -_DEFAULT_MAX_ATTEMPTS = 3 -_DEFAULT_BACKOFF_BASE = 0.2 +FAILOVER_MAX_ATTEMPTS = 3 +FAILOVER_BACKOFF_BASE = 0.2 # seconds -class FailoverOptions(TypedDict, total=False): - """Region-failover tuning, passed as the ``failover`` argument. Use it as a - plain dict, e.g. ``failover={"max_attempts": 5}``. All keys are optional. - - Keys: - max_attempts: total number of attempts including the initial request — - the original host plus up to ``max_attempts - 1`` fallback regions. - Defaults to 3. Set to 1 to disable failover (a single attempt). - backoff_base: seconds before the first retry; each subsequent retry - doubles it. Defaults to 0.2. - """ - - max_attempts: int - backoff_base: float - - -def failover_attempts(failover: Optional[FailoverOptions], host: Optional[str]) -> int: - """Total request attempts including the initial one; 1 means no failover. - - With no config (``None``) failover is enabled only for LiveKit Cloud hosts. - An explicit config enables it for any host; ``max_attempts=1`` disables it. +def failover_attempts(enabled: bool, host: Optional[str], force: bool = False) -> int: + """Total request attempts for a host; 1 means no failover. Failover only + engages when enabled and the host is a LiveKit Cloud domain. ``force`` + bypasses the cloud-host check and is for internal testing only. """ - if failover is None: - return _DEFAULT_MAX_ATTEMPTS if (bool(host) and is_cloud(host)) else 1 # type: ignore[arg-type] - return max(1, failover.get("max_attempts", _DEFAULT_MAX_ATTEMPTS)) - - -def failover_backoff_base(failover: Optional[FailoverOptions]) -> float: - return (failover or {}).get("backoff_base", _DEFAULT_BACKOFF_BASE) + if enabled and (force or (bool(host) and is_cloud(host))): + return FAILOVER_MAX_ATTEMPTS + return 1 def is_cloud(host: str) -> bool: - # Auto mode only enables failover for LiveKit Cloud project domains. + # Failover only engages for LiveKit Cloud project domains. return host.endswith(".livekit.cloud") diff --git a/livekit-api/livekit/api/_service.py b/livekit-api/livekit/api/_service.py index a4956b95..a198c388 100644 --- a/livekit-api/livekit/api/_service.py +++ b/livekit-api/livekit/api/_service.py @@ -3,8 +3,6 @@ import aiohttp from abc import ABC from .twirp_client import TwirpClient -from ._failover import FailoverOptions -from typing import Optional from .access_token import AccessToken, VideoGrants, SIPGrants AUTHORIZATION = "authorization" @@ -17,7 +15,7 @@ def __init__( host: str, api_key: str, api_secret: str, - failover: Optional[FailoverOptions] = None, + failover: bool = True, ): self._client = TwirpClient(session, host, "livekit", failover=failover) self.api_key = api_key diff --git a/livekit-api/livekit/api/agent_dispatch_service.py b/livekit-api/livekit/api/agent_dispatch_service.py index 1704f6e4..d014e6d0 100644 --- a/livekit-api/livekit/api/agent_dispatch_service.py +++ b/livekit-api/livekit/api/agent_dispatch_service.py @@ -8,7 +8,6 @@ ListAgentDispatchResponse, ) from ._service import Service -from ._failover import FailoverOptions from .access_token import VideoGrants SVC = "AgentDispatchService" @@ -33,7 +32,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverOptions] = None, + failover: bool = True, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/connector_service.py b/livekit-api/livekit/api/connector_service.py index 1016bd2e..d5eaf2a1 100644 --- a/livekit-api/livekit/api/connector_service.py +++ b/livekit-api/livekit/api/connector_service.py @@ -17,8 +17,6 @@ ConnectTwilioCallResponse, ) from ._service import Service -from ._failover import FailoverOptions -from typing import Optional from .access_token import VideoGrants SVC = "Connector" @@ -43,7 +41,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverOptions] = None, + failover: bool = True, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/egress_service.py b/livekit-api/livekit/api/egress_service.py index 8c51c5b1..12f81c3e 100644 --- a/livekit-api/livekit/api/egress_service.py +++ b/livekit-api/livekit/api/egress_service.py @@ -13,8 +13,6 @@ ListEgressResponse, ) from ._service import Service -from ._failover import FailoverOptions -from typing import Optional from .access_token import VideoGrants SVC = "Egress" @@ -41,7 +39,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverOptions] = None, + failover: bool = True, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/ingress_service.py b/livekit-api/livekit/api/ingress_service.py index 274dbc1e..b29ce460 100644 --- a/livekit-api/livekit/api/ingress_service.py +++ b/livekit-api/livekit/api/ingress_service.py @@ -8,8 +8,6 @@ ListIngressResponse, ) from ._service import Service -from ._failover import FailoverOptions -from typing import Optional from .access_token import VideoGrants SVC = "Ingress" @@ -36,7 +34,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverOptions] = None, + failover: bool = True, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/livekit_api.py b/livekit-api/livekit/api/livekit_api.py index 36d4a07b..bc0fcfaf 100644 --- a/livekit-api/livekit/api/livekit_api.py +++ b/livekit-api/livekit/api/livekit_api.py @@ -6,7 +6,6 @@ from .sip_service import SipService from .agent_dispatch_service import AgentDispatchService from .connector_service import ConnectorService -from ._failover import FailoverOptions from typing import Any, Optional @@ -32,7 +31,7 @@ def __init__( *, timeout: Optional[aiohttp.ClientTimeout] = None, session: Optional[aiohttp.ClientSession] = None, - failover: Optional[FailoverOptions] = None, + failover: bool = True, ): """Create a new LiveKitAPI instance. diff --git a/livekit-api/livekit/api/room_service.py b/livekit-api/livekit/api/room_service.py index 10d8f477..c23b6a60 100644 --- a/livekit-api/livekit/api/room_service.py +++ b/livekit-api/livekit/api/room_service.py @@ -25,7 +25,6 @@ ) from livekit.protocol.models import Room, ParticipantInfo from ._service import Service -from ._failover import FailoverOptions from typing import Optional from .access_token import VideoGrants @@ -53,7 +52,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverOptions] = None, + failover: bool = True, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/sip_service.py b/livekit-api/livekit/api/sip_service.py index 29fb97a4..36d215f3 100644 --- a/livekit-api/livekit/api/sip_service.py +++ b/livekit-api/livekit/api/sip_service.py @@ -35,7 +35,6 @@ SIPTransport, ) from ._service import Service -from ._failover import FailoverOptions from .access_token import VideoGrants, SIPGrants SVC = "SIP" @@ -60,7 +59,7 @@ def __init__( url: str, api_key: str, api_secret: str, - failover: Optional[FailoverOptions] = None, + failover: bool = True, ): super().__init__(session, url, api_key, api_secret, failover=failover) diff --git a/livekit-api/livekit/api/twirp_client.py b/livekit-api/livekit/api/twirp_client.py index 92b80511..1c8d9966 100644 --- a/livekit-api/livekit/api/twirp_client.py +++ b/livekit-api/livekit/api/twirp_client.py @@ -21,10 +21,9 @@ from urllib.parse import urlparse from ._failover import ( - FailoverOptions, + FAILOVER_BACKOFF_BASE, RegionCache, failover_attempts, - failover_backoff_base, host_key, origin_of, pick_next, @@ -109,7 +108,10 @@ def __init__( host: str, pkg: str, prefix: str = DEFAULT_PREFIX, - failover: Optional[FailoverOptions] = None, + failover: bool = True, + *, + _failover_force: bool = False, + _failover_backoff: float = FAILOVER_BACKOFF_BASE, ) -> None: parse_res = urlparse(host) scheme = parse_res.scheme @@ -121,7 +123,10 @@ def __init__( self.pkg = pkg self.prefix = prefix self._session = session - self._failover: Optional[FailoverOptions] = failover + # _failover_force / _failover_backoff are internal test-only knobs. + self._failover = failover + self._failover_force = _failover_force + self._failover_backoff = _failover_backoff self._origin = origin_of(self.host) async def request( @@ -146,7 +151,7 @@ async def request( serialized_data = data.SerializeToString() host = urlparse(self._origin).hostname - max_attempts = failover_attempts(self._failover, host) + max_attempts = failover_attempts(self._failover, host, self._failover_force) attempted = {host_key(self._origin)} region_origins: Optional[List[str]] = None current_origin = self._origin @@ -197,7 +202,7 @@ async def request( reason, next_origin, ) - await asyncio.sleep(failover_backoff_base(self._failover) * (2**attempt)) + await asyncio.sleep(self._failover_backoff * (2**attempt)) attempted.add(host_key(next_origin)) current_origin = next_origin diff --git a/tests/api/test_failover.py b/tests/api/test_failover.py index c8cb0867..ce68e043 100644 --- a/tests/api/test_failover.py +++ b/tests/api/test_failover.py @@ -30,17 +30,11 @@ import aiohttp import pytest -from typing import Optional - -from livekit.api import CreateRoomRequest, FailoverOptions, Room, TwirpError +from livekit.api import CreateRoomRequest, Room, TwirpError from livekit.api.twirp_client import TwirpClient BASE = os.getenv("LK_TEST_SERVER_URL", "http://127.0.0.1:9999") -# An explicit config enables failover on any host (the non-cloud mock) with a -# tiny backoff so the tests run fast. -FORCED: FailoverOptions = {"max_attempts": 3, "backoff_base": 0.001} - def _server_up() -> bool: try: @@ -55,9 +49,18 @@ def _server_up() -> bool: ) -async def _call(directives: dict, failover: Optional[FailoverOptions] = FORCED) -> Room: +# _failover_force bypasses the cloud-host check (the mock is on 127.0.0.1) and a +# tiny backoff keeps the tests fast — both are internal, test-only knobs. +async def _call(directives: dict, *, failover: bool = True, force: bool = True) -> Room: async with aiohttp.ClientSession() as session: - client = TwirpClient(session, BASE, "livekit", failover=failover) + client = TwirpClient( + session, + BASE, + "livekit", + failover=failover, + _failover_force=force, + _failover_backoff=0.001, + ) headers = {"authorization": "Bearer test-token", **directives} return await client.request("RoomService", "CreateRoom", CreateRoomRequest(), headers, Room) @@ -94,13 +97,13 @@ def test_region_discovery_unreachable(): asyncio.run(_call({"x-lk-mock-fail-regions": "0", "x-lk-mock-regions-status": "500"})) -def test_disabled_for_non_cloud(): - # Default (None) against 127.0.0.1 (non-cloud) must not fail over. +def test_not_cloud_host(): + # Enabled but not forced; 127.0.0.1 is not a cloud host, so no failover. with pytest.raises(TwirpError): - asyncio.run(_call({"x-lk-mock-fail-regions": "0"}, failover=None)) + asyncio.run(_call({"x-lk-mock-fail-regions": "0"}, force=False)) -def test_explicitly_disabled(): - # max_attempts=1 (a single attempt) disables failover even with a config. +def test_disabled(): + # failover=False disables failover entirely. with pytest.raises(TwirpError): - asyncio.run(_call({"x-lk-mock-fail-regions": "0"}, failover={"max_attempts": 1})) + asyncio.run(_call({"x-lk-mock-fail-regions": "0"}, failover=False)) From 006ed399159498ed7a3f259256d1dce44839604f Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 28 Jun 2026 16:44:44 +0200 Subject: [PATCH 4/5] fix ruff --- livekit-api/livekit/api/room_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/livekit-api/livekit/api/room_service.py b/livekit-api/livekit/api/room_service.py index c23b6a60..e3cb7bb6 100644 --- a/livekit-api/livekit/api/room_service.py +++ b/livekit-api/livekit/api/room_service.py @@ -25,7 +25,6 @@ ) from livekit.protocol.models import Room, ParticipantInfo from ._service import Service -from typing import Optional from .access_token import VideoGrants SVC = "RoomService" From b118ac7b1d73b6db43bab87293cc8e6bb8366bc1 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 28 Jun 2026 16:58:48 +0200 Subject: [PATCH 5/5] fix types --- livekit-api/livekit/api/_failover.py | 2 +- livekit-api/livekit/api/access_token.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/livekit-api/livekit/api/_failover.py b/livekit-api/livekit/api/_failover.py index ce6c089c..49d239ed 100644 --- a/livekit-api/livekit/api/_failover.py +++ b/livekit-api/livekit/api/_failover.py @@ -38,7 +38,7 @@ def failover_attempts(enabled: bool, host: Optional[str], force: bool = False) - engages when enabled and the host is a LiveKit Cloud domain. ``force`` bypasses the cloud-host check and is for internal testing only. """ - if enabled and (force or (bool(host) and is_cloud(host))): + if enabled and (force or (host is not None and is_cloud(host))): return FAILOVER_MAX_ATTEMPTS return 1 diff --git a/livekit-api/livekit/api/access_token.py b/livekit-api/livekit/api/access_token.py index 86766c3a..e6516819 100644 --- a/livekit-api/livekit/api/access_token.py +++ b/livekit-api/livekit/api/access_token.py @@ -271,9 +271,10 @@ def verify(self, token: str, *, verify_signature: bool = True) -> Claims: if claims.get("roomPreset"): grant_claims.room_preset = claims.get("roomPreset") - if claims.get("roomConfig"): + room_config = claims.get("roomConfig") + if room_config: grant_claims.room_config = ParseDict( - claims.get("roomConfig"), + room_config, RoomConfiguration(), ignore_unknown_fields=True, )