Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ implementations are available:
proactively once the token nears its JWT `exp` (default leeway 60s) and reactively on
`401`. Pass `account_id` to request a token scoped to a specific account
(`?account.id=<id>`); use one provider instance per account scope.
- `AccountScopedAuthentication` β€” an always account-scoped token (`account_id` is required)
backed by a process-wide cache keyed by `(secret, account_id)`. Several provider or client
instances for the same account reuse a single cached token, and refreshes are serialized
per account, so concurrent callers trigger at most one token request. It refreshes
proactively (default leeway 60s) and reactively on `401`. Use this when many clients share
the same account scope or when many requests run concurrently.

## Instantiate The Client

Expand Down Expand Up @@ -85,6 +91,20 @@ client = MPTClient.from_config(
)
```

With an account-scoped token shared across clients and concurrent requests:

```python
from mpt_api_client import MPTClient, AccountScopedAuthentication

client = MPTClient.from_config(
authentication=AccountScopedAuthentication(
secret="<extension-secret>",
account_id="<account-id>",
),
base_url="https://api.s1.show/public",
)
```

`from_config` also accepts a `timeout` argument (HTTP request timeout in seconds, default `60.0`).

## Synchronous Usage Patterns
Expand Down
2 changes: 2 additions & 0 deletions mpt_api_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from mpt_api_client.auth import (
AccountScopedAuthentication,
Authentication,
BearerTokenAuthentication,
ExtensionFrameworkAuthentication,
Expand All @@ -7,6 +8,7 @@
from mpt_api_client.rql import RQLQuery

__all__ = [ # noqa: WPS410
"AccountScopedAuthentication",
"AsyncMPTClient",
"Authentication",
"BearerTokenAuthentication",
Expand Down
9 changes: 8 additions & 1 deletion mpt_api_client/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from mpt_api_client.auth.base import Authentication, BearerTokenAuthentication
from mpt_api_client.auth.account_scoped import AccountScopedAuthentication
from mpt_api_client.auth.base import (
Authentication,
BearerTokenAuthentication,
InstallationTokenAuthentication,
)
from mpt_api_client.auth.extension_framework import ExtensionFrameworkAuthentication

__all__ = [ # noqa: WPS410
"AccountScopedAuthentication",
"Authentication",
"BearerTokenAuthentication",
"ExtensionFrameworkAuthentication",
"InstallationTokenAuthentication",
]
171 changes: 171 additions & 0 deletions mpt_api_client/auth/account_scoped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""Account-scoped authentication for the MPT integration API.

This provider fetches account-scoped installation tokens and shares them across instances
through a process-wide cache keyed by ``(secret, account_id)``. Token fetches are serialized
per account, so concurrent callers for the same account trigger at most one token request.
"""

import asyncio
import datetime as dt
import threading
from collections.abc import AsyncGenerator, Generator
from dataclasses import dataclass
from typing import ClassVar, override

import httpx

from mpt_api_client.auth.base import InstallationTokenAuthentication
from mpt_api_client.exceptions import MPTError

DEFAULT_TOKEN_VALIDITY_LEEWAY_SECONDS = 60

CacheKey = tuple[str, str]


@dataclass(frozen=True)
class _CachedToken:
"""A cached account token together with its decoded expiry."""

token: str
expires_at: dt.datetime | None


class AccountScopedAuthentication(InstallationTokenAuthentication): # noqa: WPS214
"""Authenticate with an account-scoped token from a shared, concurrency-safe cache.

Tokens are cached process-wide keyed by ``(secret, account_id)``, so several provider or
client instances for the same account reuse a single token. Refresh is serialized per
account through a lock with double-checked caching: concurrent callers for the same
account trigger at most one token request. Refresh happens proactively once the token is
within ``min_remaining_validity_seconds`` of its JWT ``exp`` claim, with a reactive
refresh on ``401 Unauthorized`` for tokens revoked before they expire. When the fetched
token carries no readable ``exp`` claim, proactive refresh is skipped and only the
reactive ``401`` path applies.

The token call is delegated to :class:`InstallationsTokenService` (and its async
counterpart) over a dedicated client authenticated with the extension secret; that
client's base URL is supplied by the owning HTTP client through :meth:`configure`.
"""

_token_cache: ClassVar[dict[CacheKey, _CachedToken]] = {}
_sync_locks: ClassVar[dict[CacheKey, threading.Lock]] = {}
_async_locks: ClassVar[dict[CacheKey, asyncio.Lock]] = {}

def __init__(
self,
secret: str,
account_id: str,
min_remaining_validity_seconds: int = DEFAULT_TOKEN_VALIDITY_LEEWAY_SECONDS,
) -> None:
"""Initialize the provider.

Args:
secret: Extension secret used to authenticate token requests.
account_id: Account the requested token is scoped to.
min_remaining_validity_seconds: Proactive refresh leeway before the JWT ``exp``.
"""
super().__init__(secret)
self._account_id = account_id
self._min_remaining_validity_seconds = min_remaining_validity_seconds
Comment on lines +54 to +69

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟑 Minor | ⚑ Quick win

Validate non-negative refresh leeway at construction.

min_remaining_validity_seconds is used directly in expiry threshold math (Line 149). If a negative value is passed, expired tokens can be treated as reusable. Add a constructor guard so invalid input fails fast.

Proposed fix
     def __init__(
         self,
         secret: str,
         account_id: str,
         min_remaining_validity_seconds: int = DEFAULT_TOKEN_VALIDITY_LEEWAY_SECONDS,
     ) -> None:
@@
         """
         super().__init__(secret)
+        if min_remaining_validity_seconds < 0:
+            raise ValueError("min_remaining_validity_seconds must be >= 0")
         self._account_id = account_id
         self._min_remaining_validity_seconds = min_remaining_validity_seconds

Also applies to: 149-150

πŸ€– Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@mpt_api_client/auth/account_scoped.py` around lines 54 - 69, Add validation
in the __init__ method of the AccountScopedTokenProvider class to ensure the
min_remaining_validity_seconds parameter is non-negative. Perform this check
after the super().__init__ call but before assigning the value to
self._min_remaining_validity_seconds. Raise a ValueError if a negative value is
provided to fail fast and prevent expired tokens from being incorrectly treated
as reusable in the expiry threshold calculations downstream.


@classmethod
def clear_cache(cls) -> None:
"""Clear all cached account tokens and refresh locks."""
cls._token_cache.clear()
cls._sync_locks.clear()
cls._async_locks.clear()

@override
def sync_auth_flow(
self, request: httpx.Request
) -> Generator[httpx.Request, httpx.Response, None]:
"""Attach an account-scoped token, refreshing it proactively and on 401."""
token = self._token_sync()
request.headers["Authorization"] = f"Bearer {token}"
response = yield request
if response.status_code == httpx.codes.UNAUTHORIZED:
rejected = request.headers["Authorization"].removeprefix("Bearer ")
request.headers["Authorization"] = f"Bearer {self._token_sync(rejected)}"
yield request

@override
async def async_auth_flow(
self, request: httpx.Request
) -> AsyncGenerator[httpx.Request, httpx.Response]:
"""Attach an account-scoped token, refreshing it proactively and on 401."""
token = await self._token_async()
request.headers["Authorization"] = f"Bearer {token}"
response = yield request
if response.status_code == httpx.codes.UNAUTHORIZED:
rejected = request.headers["Authorization"].removeprefix("Bearer ")
refreshed = await self._token_async(rejected)
request.headers["Authorization"] = f"Bearer {refreshed}"
yield request

@property
def _cache_key(self) -> CacheKey:
"""Return the shared-cache key for this provider's scope."""
return self._secret, self._account_id

def _token_sync(self, rejected: str | None = None) -> str:
"""Return a usable token, fetching one under a per-account lock when needed."""
cached = self._token_cache.get(self._cache_key)
if self._is_usable(cached, rejected):
return cached.token # type: ignore[union-attr]

lock = self._sync_locks.setdefault(self._cache_key, threading.Lock())
with lock:
cached = self._token_cache.get(self._cache_key)
if self._is_usable(cached, rejected):
return cached.token # type: ignore[union-attr]
fetched = self._get_sync_service().token(self._account_id)
return self._store(fetched.token)

async def _token_async(self, rejected: str | None = None) -> str:
"""Return a usable token, fetching one under a per-account lock when needed."""
cached = self._token_cache.get(self._cache_key)
if self._is_usable(cached, rejected):
return cached.token # type: ignore[union-attr]

lock = self._async_locks.setdefault(self._cache_key, asyncio.Lock())
async with lock:
cached = self._token_cache.get(self._cache_key)
if self._is_usable(cached, rejected):
return cached.token # type: ignore[union-attr]
fetched = await self._get_async_service().token(self._account_id)
return self._store(fetched.token)

def _is_usable(self, cached: _CachedToken | None, rejected: str | None) -> bool:
"""Return whether the cached token can be reused for the current request.

A token is unusable when it is missing, when it equals a token the server just
rejected, or when it is within the proactive refresh leeway of its expiry. Tokens
without a readable ``exp`` are reused and rely on the reactive ``401`` path.
"""
if cached is None or cached.token == rejected:
return False
if cached.expires_at is None:
return True
threshold = dt.datetime.now(dt.UTC).timestamp() + self._min_remaining_validity_seconds
return cached.expires_at.timestamp() > threshold

def _store(self, token: str | None) -> str:
"""Cache a freshly fetched token, evicting expired entries, and return it."""
if not token:
raise MPTError("Installations token endpoint returned an empty token.")
self._token_cache[self._cache_key] = _CachedToken(token, self._read_expiry(token))
self._evict_expired()
return token

def _evict_expired(self) -> None:
"""Drop cache entries (and their locks) whose tokens have already expired."""
now = dt.datetime.now(dt.UTC)
expired_keys = [
key
for key, cached in self._token_cache.items()
if cached.expires_at is not None and cached.expires_at <= now
]
for key in expired_keys:
self._token_cache.pop(key, None)
self._sync_locks.pop(key, None)
self._async_locks.pop(key, None)
Comment on lines +152 to +171

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, locate and read the AccountScopedAuthentication class
find . -name "account_scoped.py" -type f | head -20

Repository: softwareone-platform/mpt-api-python-client

Length of output: 126


🏁 Script executed:

#!/bin/bash
# Read the account_scoped.py file to understand the implementation
cat -n mpt_api_client/auth/account_scoped.py

Repository: softwareone-platform/mpt-api-python-client

Length of output: 8785


🏁 Script executed:

#!/bin/bash
# Search for all references to _token_cache, _sync_locks, _async_locks across the codebase
rg "(_token_cache|_sync_locks|_async_locks)" -n --context 2

Repository: softwareone-platform/mpt-api-python-client

Length of output: 4855


🏁 Script executed:

#!/bin/bash
# Look for threading patterns and Lock usage in the auth module
rg "threading\.|Lock\(\)|asyncio\.Lock\(\)" mpt_api_client/auth/ -n --context 1

Repository: softwareone-platform/mpt-api-python-client

Length of output: 855


🏁 Script executed:

#!/bin/bash
# Check if the _cache_guard fix has been applied
grep -n "_cache_guard" mpt_api_client/auth/account_scoped.py

Repository: softwareone-platform/mpt-api-python-client

Length of output: 68


🏁 Script executed:

#!/bin/bash
# Look for any test coverage of concurrent account access
grep -r "concurrent\|thread\|async.*test" tests/unit/auth/test_account_scoped.py -i

Repository: softwareone-platform/mpt-api-python-client

Length of output: 217


🏁 Script executed:

#!/bin/bash
# Check what the concurrency test actually tests
sed -n '210,280p' tests/unit/auth/test_account_scoped.py

Repository: softwareone-platform/mpt-api-python-client

Length of output: 2213


🏁 Script executed:

#!/bin/bash
# Check if there are any tests mixing different accounts with concurrent access
grep -A 30 "test_serialized_refresh" tests/unit/auth/test_account_scoped.py

Repository: softwareone-platform/mpt-api-python-client

Length of output: 1320


🏁 Script executed:

#!/bin/bash
# Check if there are any tests that exercise multiple accounts concurrently
grep -B 5 -A 15 "ACC-OLD\|ACC-NEW" tests/unit/auth/test_account_scoped.py | head -40

Repository: softwareone-platform/mpt-api-python-client

Length of output: 715


🏁 Script executed:

#!/bin/bash
# Look for documentation or usage patterns showing multiple accounts
grep -r "AccountScopedAuthentication" --include="*.py" --include="*.md" | grep -v "test_" | grep -v ".pyc"

Repository: softwareone-platform/mpt-api-python-client

Length of output: 752


🏁 Script executed:

#!/bin/bash
# Check if the concurrency test uses multiple accounts
grep -B 5 -A 20 "test_serialized_refresh_fetches_token_once_under_concurrency" tests/unit/auth/test_account_scoped.py

Repository: softwareone-platform/mpt-api-python-client

Length of output: 1065


🏁 Script executed:

#!/bin/bash
# Look at the full test file structure to understand concurrency coverage
grep "^def test_\|^async def test_" tests/unit/auth/test_account_scoped.py

Repository: softwareone-platform/mpt-api-python-client

Length of output: 670


🏁 Script executed:

#!/bin/bash
# Check the full test_different_accounts_fetch_separate_tokens to see if it's concurrent
grep -B 5 -A 25 "test_different_accounts_fetch_separate_tokens" tests/unit/auth/test_account_scoped.py

Repository: softwareone-platform/mpt-api-python-client

Length of output: 1136


🏁 Script executed:

#!/bin/bash
# Run the test suite to verify the current state of the code
# First, let's check if there's a way to run tests and see if they would catch this race
grep -r "pytest\|test" docs/testing.md | head -10

Repository: softwareone-platform/mpt-api-python-client

Length of output: 715


Serialize global cache eviction to prevent concurrent-dict runtime failures.

Line 165 iterates over AccountScopedAuthentication._token_cache while lines 156/169 mutate that same shared dict from request threads handling different account keys. Per-account locks (one per cache key) do not protect this global iteration, so concurrent writes to _token_cache from different accounts can crash with dictionary-size-change errors during iteration.

The test test_different_accounts_fetch_separate_tokens() only exercises sequential access across accounts; test_serialized_refresh_fetches_token_once_under_concurrency() only covers concurrency within a single account (protected by its per-account lock). No test validates concurrent access across multiple accounts, where the race window exists.

Proposed fix
 class AccountScopedAuthentication(InstallationTokenAuthentication):  # noqa: WPS214
@@
     _token_cache: ClassVar[dict[CacheKey, _CachedToken]] = {}
     _sync_locks: ClassVar[dict[CacheKey, threading.Lock]] = {}
     _async_locks: ClassVar[dict[CacheKey, asyncio.Lock]] = {}
+    _cache_guard: ClassVar[threading.Lock] = threading.Lock()
@@
     def clear_cache(cls) -> None:
         """Clear all cached account tokens and refresh locks."""
-        cls._token_cache.clear()
-        cls._sync_locks.clear()
-        cls._async_locks.clear()
+        with cls._cache_guard:
+            cls._token_cache.clear()
+            cls._sync_locks.clear()
+            cls._async_locks.clear()
@@
     def _store(self, token: str | None) -> str:
         """Cache a freshly fetched token, evicting expired entries, and return it."""
         if not token:
             raise MPTError("Installations token endpoint returned an empty token.")
-        self._token_cache[self._cache_key] = _CachedToken(token, self._read_expiry(token))
-        self._evict_expired()
+        with self._cache_guard:
+            self._token_cache[self._cache_key] = _CachedToken(token, self._read_expiry(token))
+            self._evict_expired()
         return token
πŸ€– Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@mpt_api_client/auth/account_scoped.py` around lines 152 - 171, The
_evict_expired() method iterates over the shared _token_cache dictionary while
the _store() method (and potentially other threads) concurrently mutate it from
different accounts, causing dictionary-size-change errors during iteration. The
existing per-account locks protect only individual cache keys, not the global
cache itself. Add a global lock (separate from the per-account sync/async locks)
to serialize access to _token_cache during the iteration in _evict_expired(),
and acquire this same lock in _store() before modifying _token_cache to ensure
thread-safe concurrent access across different account keys.

90 changes: 90 additions & 0 deletions mpt_api_client/auth/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,27 @@
the sync and the async HTTP clients.
"""

import datetime as dt
from collections.abc import Generator
from typing import override

import httpx

from mpt_api_client.auth.jwt import (
JWTClaimsError,
JWTFormatError,
decode_unverified_jwt_claims,
)
from mpt_api_client.exceptions import MPTError
from mpt_api_client.http import AsyncHTTPClient, HTTPClient
from mpt_api_client.resources.integration.installations_token import (
AsyncInstallationsTokenService,
InstallationsTokenService,
)

DEFAULT_TOKEN_CLIENT_TIMEOUT_SECONDS = 20.0
DEFAULT_TOKEN_CLIENT_RETRIES = 5


class Authentication(httpx.Auth):
"""Base class for MPT API authentication providers."""
Expand Down Expand Up @@ -38,3 +54,77 @@ def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Re
"""Attach the bearer token to the outgoing request."""
request.headers["Authorization"] = f"Bearer {self._token}"
yield request


class InstallationTokenAuthentication(Authentication):
"""Base for providers backed by the integration installations token endpoint.

Holds the extension secret, captures the owning client's configuration through
:meth:`configure`, and lazily builds a dedicated token client (authenticated with the
extension secret) that hosts :class:`InstallationsTokenService` and its async counterpart.
Subclasses implement the caching and ``auth_flow`` behavior.
"""

def __init__(self, secret: str) -> None:
"""Initialize the provider.

Args:
secret: Extension secret used to authenticate token requests.
"""
self._secret = secret
self._base_url: str | None = None
self._timeout: float = DEFAULT_TOKEN_CLIENT_TIMEOUT_SECONDS
self._retries: int = DEFAULT_TOKEN_CLIENT_RETRIES
self._sync_service: InstallationsTokenService | None = None
self._async_service: AsyncInstallationsTokenService | None = None

@override
def configure(self, *, base_url: str, timeout: float, retries: int) -> None:
"""Store the owning client's configuration used to build the token client."""
self._base_url = base_url
self._timeout = timeout
self._retries = retries

def _get_sync_service(self) -> InstallationsTokenService:
"""Return the cached sync token service, building it on first use."""
if self._sync_service is None:
token_client = HTTPClient(
authentication=BearerTokenAuthentication(self._secret),
base_url=self._require_base_url(),
timeout=self._timeout,
retries=self._retries,
)
self._sync_service = InstallationsTokenService(http_client=token_client)
return self._sync_service

def _get_async_service(self) -> AsyncInstallationsTokenService:
"""Return the cached async token service, building it on first use."""
if self._async_service is None:
token_client = AsyncHTTPClient(
authentication=BearerTokenAuthentication(self._secret),
base_url=self._require_base_url(),
timeout=self._timeout,
retries=self._retries,
)
self._async_service = AsyncInstallationsTokenService(http_client=token_client)
return self._async_service

def _require_base_url(self) -> str:
"""Return the configured base URL, raising when the provider is unconfigured."""
if self._base_url is None:
raise MPTError(
f"{type(self).__name__} must be used with an MPT HTTPClient or AsyncHTTPClient; "
"the base URL was not configured.",
)
return self._base_url

def _read_expiry(self, token: str) -> dt.datetime | None:
"""Read the ``exp`` claim from the token, ignoring tokens without one."""
try:
claims = decode_unverified_jwt_claims(token)
except (JWTFormatError, JWTClaimsError):
return None
exp = claims.get("exp")
if not isinstance(exp, int):
return None
return dt.datetime.fromtimestamp(exp, tz=dt.UTC)
Loading
Loading