Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions .github/workflows/test-api.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2026 LiveKit, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Test API

permissions:
contents: read

on:
workflow_dispatch:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
failover:
runs-on: ubuntu-latest
services:
mock-server:
image: livekit/test-server:latest
ports:
- 9999:9999
- 10000:10000
- 10001:10001
- 10002:10002
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
submodules: true

- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6
with:
python-version: "3.12"

- name: Install uv
uses: astral-sh/setup-uv@08807647e7069bb48b6ef5acd8ec9567f424441b # v8.1.0

- name: Install livekit-api (without livekit-rtc)
run: |
uv venv .failover-venv
source .failover-venv/bin/activate
uv pip install ./livekit-protocol ./livekit-api pytest aiohttp

- name: Wait for mock server
run: |
for i in $(seq 1 30); do
curl -sf http://127.0.0.1:9999/settings/regions >/dev/null && exit 0
sleep 1
done
echo "mock server did not become ready" && exit 1

- name: Run API tests
run: |
source .failover-venv/bin/activate
pytest tests/api -v
2 changes: 2 additions & 0 deletions livekit-api/livekit/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from .twirp_client import TwirpError, TwirpErrorCode
from .livekit_api import LiveKitAPI
from ._failover import FailoverConfig
from .access_token import (
InferenceGrants,
ObservabilityGrants,
Expand Down Expand Up @@ -66,4 +67,5 @@
"WebhookReceiver",
"TwirpError",
"TwirpErrorCode",
"FailoverConfig",
]
165 changes: 165 additions & 0 deletions livekit-api/livekit/api/_failover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright 2026 LiveKit, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Region failover for the Twirp API clients.

On a retryable failure (any transport error or HTTP 5xx) the client discovers
alternative LiveKit Cloud regions via ``/settings/regions`` and replays the
request against the next region, with exponential backoff. 4xx responses are
returned immediately.
"""

from __future__ import annotations

import time
from dataclasses import dataclass
from typing import Dict, List, Optional, TypedDict
from urllib.parse import urlparse

import aiohttp

_DEFAULT_MAX_ATTEMPTS = 3
_DEFAULT_BACKOFF_BASE = 0.2


class FailoverConfig(TypedDict, total=False):
"""Region-failover tuning, passed as the ``failover`` argument. Use it as a
plain dict, e.g. ``failover={"max_attempts": 5}``. All keys are optional.

Keys:
max_attempts: total number of attempts including the initial request —
the original host plus up to ``max_attempts - 1`` fallback regions.
Defaults to 3. Set to 1 to disable failover (a single attempt).
backoff_base: seconds before the first retry; each subsequent retry
doubles it. Defaults to 0.2.
"""

max_attempts: int
backoff_base: float


def failover_attempts(failover: Optional[FailoverConfig], host: Optional[str]) -> int:
"""Total request attempts including the initial one; 1 means no failover.

With no config (``None``) failover is enabled only for LiveKit Cloud hosts.
An explicit config enables it for any host; ``max_attempts=1`` disables it.
"""
if failover is None:
return _DEFAULT_MAX_ATTEMPTS if (bool(host) and is_cloud(host)) else 1 # type: ignore[arg-type]
return max(1, failover.get("max_attempts", _DEFAULT_MAX_ATTEMPTS))


def failover_backoff_base(failover: Optional[FailoverConfig]) -> float:
return (failover or {}).get("backoff_base", _DEFAULT_BACKOFF_BASE)


def is_cloud(host: str) -> bool:
# Auto mode only enables failover for LiveKit Cloud project domains.
return host.endswith(".livekit.cloud")


def to_http(url: str) -> str:
"""Normalizes a region URL to an http(s) scheme (ws -> http, wss -> https)."""
if url.startswith("ws"):
return "http" + url[2:]
return url


def origin_of(url: str) -> str:
"""Returns the scheme://host[:port] origin of a URL, dropping any path."""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"


def host_key(url: str) -> str:
"""A stable key identifying a host (including port) for dedup across attempts."""
return urlparse(url).netloc.lower()


def pick_next(region_origins: List[str], attempted: set[str]) -> Optional[str]:
"""Returns the first region origin whose host has not yet been attempted."""
for origin in region_origins:
if host_key(origin) not in attempted:
return origin
return None


@dataclass
class _CacheEntry:
origins: List[str]
fetched_at: float
ttl: float


class RegionCache:
"""Process-wide cache of the LiveKit Cloud region list, keyed by host."""

def __init__(self) -> None:
self._entries: Dict[str, _CacheEntry] = {}

async def region_origins(
self,
session: aiohttp.ClientSession,
origin: str,
headers: Dict[str, str],
) -> List[str]:
"""Returns alternative region origins for ``origin``, fetching
``/settings/regions`` if the cache is stale. Best-effort: on a fetch
failure it serves a stale cached list when available, otherwise an empty
list. Forwards ``headers`` so a valid token — and any test directives —
reach the discovery endpoint."""
key = host_key(origin)
entry = self._entries.get(key)
if entry is not None and (time.monotonic() - entry.fetched_at) < entry.ttl:
return entry.origins

try:
origins, ttl = await self._fetch(session, origin, headers)
except Exception:
return entry.origins if entry is not None else []

# A zero TTL (e.g. Cache-Control: max-age=0) means "do not cache".
if ttl > 0:
self._entries[key] = _CacheEntry(origins, time.monotonic(), ttl)
return origins

async def _fetch(
self,
session: aiohttp.ClientSession,
origin: str,
headers: Dict[str, str],
) -> tuple[List[str], float]:
fetch_headers = {
k: v for k, v in headers.items() if k.lower() not in ("content-type", "content-length")
}
async with session.get(f"{origin}/settings/regions", headers=fetch_headers) as resp:
if resp.status != 200:
raise RuntimeError(f"region discovery failed: {resp.status}")
ttl = _parse_max_age(resp.headers.get("Cache-Control"))
body = await resp.json()
origins = [origin_of(to_http(r["url"])) for r in body.get("regions", []) if r.get("url")]
return origins, ttl


def _parse_max_age(cache_control: Optional[str]) -> float:
if not cache_control:
return 0.0
for directive in cache_control.split(","):
directive = directive.strip().lower()
if directive.startswith("max-age="):
try:
return float(int(directive[len("max-age=") :]))
except ValueError:
return 0.0
return 0.0
13 changes: 11 additions & 2 deletions livekit-api/livekit/api/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@
import aiohttp
from abc import ABC
from .twirp_client import TwirpClient
from ._failover import FailoverConfig
from typing import Optional
from .access_token import AccessToken, VideoGrants, SIPGrants

AUTHORIZATION = "authorization"


class Service(ABC):
def __init__(self, session: aiohttp.ClientSession, host: str, api_key: str, api_secret: str):
self._client = TwirpClient(session, host, "livekit")
def __init__(
self,
session: aiohttp.ClientSession,
host: str,
api_key: str,
api_secret: str,
failover: Optional[FailoverConfig] = None,
):
self._client = TwirpClient(session, host, "livekit", failover=failover)
self.api_key = api_key
self.api_secret = api_secret

Expand Down
12 changes: 10 additions & 2 deletions livekit-api/livekit/api/agent_dispatch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ListAgentDispatchResponse,
)
from ._service import Service
from ._failover import FailoverConfig
from .access_token import VideoGrants

SVC = "AgentDispatchService"
Expand All @@ -26,8 +27,15 @@ class AgentDispatchService(Service):
```
"""

def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
super().__init__(session, url, api_key, api_secret)
def __init__(
self,
session: aiohttp.ClientSession,
url: str,
api_key: str,
api_secret: str,
failover: Optional[FailoverConfig] = None,
):
super().__init__(session, url, api_key, api_secret, failover=failover)

async def create_dispatch(self, req: CreateAgentDispatchRequest) -> AgentDispatch:
"""Create an explicit dispatch for an agent to join a room.
Expand Down
13 changes: 11 additions & 2 deletions livekit-api/livekit/api/connector_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
ConnectTwilioCallResponse,
)
from ._service import Service
from ._failover import FailoverConfig
from typing import Optional
from .access_token import VideoGrants

SVC = "Connector"
Expand All @@ -35,8 +37,15 @@ class ConnectorService(Service):
```
"""

def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
super().__init__(session, url, api_key, api_secret)
def __init__(
self,
session: aiohttp.ClientSession,
url: str,
api_key: str,
api_secret: str,
failover: Optional[FailoverConfig] = None,
):
super().__init__(session, url, api_key, api_secret, failover=failover)

async def dial_whatsapp_call(
self, request: DialWhatsAppCallRequest
Expand Down
13 changes: 11 additions & 2 deletions livekit-api/livekit/api/egress_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
ListEgressResponse,
)
from ._service import Service
from ._failover import FailoverConfig
from typing import Optional
from .access_token import VideoGrants

SVC = "Egress"
Expand All @@ -33,8 +35,15 @@ class EgressService(Service):
Also see https://docs.livekit.io/home/egress/overview/
"""

def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
super().__init__(session, url, api_key, api_secret)
def __init__(
self,
session: aiohttp.ClientSession,
url: str,
api_key: str,
api_secret: str,
failover: Optional[FailoverConfig] = None,
):
super().__init__(session, url, api_key, api_secret, failover=failover)

async def start_room_composite_egress(self, start: RoomCompositeEgressRequest) -> EgressInfo:
"""Starts a composite recording of a room."""
Expand Down
13 changes: 11 additions & 2 deletions livekit-api/livekit/api/ingress_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
ListIngressResponse,
)
from ._service import Service
from ._failover import FailoverConfig
from typing import Optional
from .access_token import VideoGrants

SVC = "Ingress"
Expand All @@ -28,8 +30,15 @@ class IngressService(Service):
Also see https://docs.livekit.io/home/ingress/overview/
"""

def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str):
super().__init__(session, url, api_key, api_secret)
def __init__(
self,
session: aiohttp.ClientSession,
url: str,
api_key: str,
api_secret: str,
failover: Optional[FailoverConfig] = None,
):
super().__init__(session, url, api_key, api_secret, failover=failover)

async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo:
return await self._client.request(
Expand Down
Loading
Loading