diff --git a/agentex/src/api/routes/agent_api_keys.py b/agentex/src/api/routes/agent_api_keys.py index 4048ec61..c236850c 100644 --- a/agentex/src/api/routes/agent_api_keys.py +++ b/agentex/src/api/routes/agent_api_keys.py @@ -2,14 +2,29 @@ from fastapi import APIRouter, HTTPException, Query +from src.adapters.crud_store.exceptions import ItemDoesNotExist from src.api.schemas.agent_api_keys import ( AgentAPIKey, CreateAPIKeyRequest, CreateAPIKeyResponse, ) +from src.api.schemas.authorization_types import ( + AgentexResource, + AgentexResourceType, + AuthorizedOperationType, +) from src.domain.entities.agent_api_keys import AgentAPIKeyType +from src.domain.services.authorization_service import DAuthorizationService from src.domain.use_cases.agent_api_keys_use_case import DAgentAPIKeysUseCase from src.domain.use_cases.agents_use_case import DAgentsUseCase +from src.utils.agent_api_key_authorization import ( + API_KEY_NOT_FOUND_MESSAGE, + _check_api_key_or_collapse_to_404, +) +from src.utils.authorization_shortcuts import ( + DAuthorizedId, + DAuthorizedResourceIds, +) from src.utils.logging import make_logger logger = make_logger(__name__) @@ -28,6 +43,7 @@ async def create_api_key( request: CreateAPIKeyRequest, agent_api_key_use_case: DAgentAPIKeysUseCase, agent_use_case: DAgentsUseCase, + authorization_service: DAuthorizationService, ) -> CreateAPIKeyResponse: if not request.agent_id and not request.agent_name: raise HTTPException( @@ -40,6 +56,14 @@ async def create_api_key( detail="Only one of 'agent_id' or 'agent_name' should be provided to create an agent api_key.", ) agent = await agent_use_case.get(id=request.agent_id, name=request.agent_name) + + # No api_key resource exists yet, so SpiceDB can't gate transitively — + # gate on parent ``agent.update`` directly. + await authorization_service.check( + resource=AgentexResource.agent(agent.id), + operation=AuthorizedOperationType.update, + ) + # Check if external agent API key already exists for this name and agent ID existing_api_key = await agent_api_key_use_case.get_by_agent_id_and_name( agent_id=agent.id, @@ -77,6 +101,9 @@ async def create_api_key( async def list_agent_api_keys( agent_api_key_use_case: DAgentAPIKeysUseCase, agent_use_case: DAgentsUseCase, + authorized_api_key_ids: DAuthorizedResourceIds( + AgentexResourceType.api_key, AuthorizedOperationType.read + ), agent_id: str | None = None, agent_name: str | None = None, limit: int = Query(default=50, ge=1, le=1000), @@ -93,8 +120,13 @@ async def list_agent_api_keys( detail="Only one of 'agent_id' or 'agent_name' should be provided to list agent api_keys.", ) agent = await agent_use_case.get(id=agent_id, name=agent_name) + # ``id`` filter runs at the SQL layer so limit/offset apply post-filter. + # ``None`` = authz declined to enumerate (e.g. bypass); pass through. agent_api_key_entities = await agent_api_key_use_case.list( - agent_id=agent.id, limit=limit, page_number=page_number + agent_id=agent.id, + limit=limit, + page_number=page_number, + id=authorized_api_key_ids, ) return [ AgentAPIKey.model_validate(agent_api_key_entity) @@ -112,6 +144,7 @@ async def get_agent_api_key_by_name( name: str, agent_api_key_use_case: DAgentAPIKeysUseCase, agent_use_case: DAgentsUseCase, + authorization_service: DAuthorizationService, agent_id: str | None = None, agent_name: str | None = None, api_key_type: AgentAPIKeyType = AgentAPIKeyType.EXTERNAL, @@ -131,10 +164,16 @@ async def get_agent_api_key_by_name( agent_id=agent.id, name=name, api_key_type=api_key_type ) if not agent_api_key_entity: - raise HTTPException( - status_code=404, - detail=f"Agent api_key '{name}' not found for agent ID {agent.id}", - ) + # Absent and denied 404s must be byte-for-byte identical — see + # ``API_KEY_NOT_FOUND_MESSAGE``. + raise ItemDoesNotExist(API_KEY_NOT_FOUND_MESSAGE) + # Composite lookup key ``(agent_id, name, api_key_type)`` doesn't fit + # ``DAuthorizedName`` — apply the collapse inline. + await _check_api_key_or_collapse_to_404( + authorization_service, + agent_api_key_entity.id, + AuthorizedOperationType.read, + ) return AgentAPIKey.model_validate(agent_api_key_entity) @@ -147,6 +186,11 @@ async def get_agent_api_key_by_name( async def get_agent_api_key( id: str, agent_api_key_use_case: DAgentAPIKeysUseCase, + _authorized_id: DAuthorizedId( + AgentexResourceType.api_key, + AuthorizedOperationType.read, + param_name="id", + ), ) -> AgentAPIKey: agent_api_key_entity = await agent_api_key_use_case.get(id=id) return AgentAPIKey.model_validate(agent_api_key_entity) @@ -161,7 +205,14 @@ async def get_agent_api_key( async def delete_agent_api_key( id: str, agent_api_key_use_case: DAgentAPIKeysUseCase, + _authorized_id: DAuthorizedId( + AgentexResourceType.api_key, + AuthorizedOperationType.delete, + param_name="id", + ), ) -> str: + # SpiceDB's ``api_key.delete`` expands transitively to + # ``parent_agent->update``, so the dep above enforces both factors. await agent_api_key_use_case.delete(id=id) return f"Agent API key with ID {id} deleted" @@ -176,6 +227,7 @@ async def delete_agent_api_key_by_name( api_key_name: str, agent_api_key_use_case: DAgentAPIKeysUseCase, agent_use_case: DAgentsUseCase, + authorization_service: DAuthorizationService, agent_id: str | None = None, agent_name: str | None = None, api_key_type: AgentAPIKeyType = AgentAPIKeyType.EXTERNAL, @@ -191,10 +243,21 @@ async def delete_agent_api_key_by_name( detail="Only one of 'agent_id' or 'agent_name' should be provided to delete an agent api_key.", ) agent = await agent_use_case.get(id=agent_id, name=agent_name) - await agent_api_key_use_case.delete_by_agent_id_and_key_name( - agent_id=agent.id, - key_name=api_key_name, - api_key_type=api_key_type, + + # Resolve name -> id, check, then delete by the resolved id. Deleting by + # name would race: if the row were replaced between check and delete, the + # check would evaluate the old id but the mutation would land on the new + # one. + existing = await agent_api_key_use_case.get_by_agent_id_and_name( + agent_id=agent.id, name=api_key_name, api_key_type=api_key_type + ) + if not existing: + raise ItemDoesNotExist(API_KEY_NOT_FOUND_MESSAGE) + await _check_api_key_or_collapse_to_404( + authorization_service, + existing.id, + AuthorizedOperationType.delete, ) + await agent_api_key_use_case.delete(id=existing.id) return f"Agent api_key '{api_key_name}' deleted" diff --git a/agentex/src/api/schemas/authorization_types.py b/agentex/src/api/schemas/authorization_types.py index 727050a0..c150a4c3 100644 --- a/agentex/src/api/schemas/authorization_types.py +++ b/agentex/src/api/schemas/authorization_types.py @@ -66,7 +66,5 @@ def api_key(cls, selector: str | None = None) -> "AgentexResourceOptionalSelecto return cls(type=AgentexResourceType.api_key, selector=selector) @classmethod - def schedule( - cls, selector: str | None = None - ) -> "AgentexResourceOptionalSelector": + def schedule(cls, selector: str | None = None) -> "AgentexResourceOptionalSelector": return cls(type=AgentexResourceType.schedule, selector=selector) diff --git a/agentex/src/domain/use_cases/agent_api_keys_use_case.py b/agentex/src/domain/use_cases/agent_api_keys_use_case.py index e6af6e51..88c23d79 100644 --- a/agentex/src/domain/use_cases/agent_api_keys_use_case.py +++ b/agentex/src/domain/use_cases/agent_api_keys_use_case.py @@ -246,10 +246,23 @@ async def delete_by_agent_name_and_key_name( await self._deregister_api_key_from_auth(api_key_id=existing.id) async def list( - self, agent_id: str, limit: int, page_number: int + self, + agent_id: str, + limit: int, + page_number: int, + id: list[str] | None = None, ) -> list[AgentAPIKeyEntity]: + # ``id`` is the FGAC-filter shape used by route deps (DAuthorizedResourceIds). + # ``None`` means "no filter" (e.g. authz bypass); an empty list means + # "caller can see no api_keys" and must short-circuit to avoid the + # base repo translating ``id=[]`` into an unfiltered query. + if id is not None and not id: + return [] + filters: dict = {"agent_id": agent_id} + if id is not None: + filters["id"] = id return await self.agent_api_key_repo.list( - limit=limit, page_number=page_number, filters={"agent_id": agent_id} + limit=limit, page_number=page_number, filters=filters ) async def forward_agent_request( diff --git a/agentex/src/utils/agent_api_key_authorization.py b/agentex/src/utils/agent_api_key_authorization.py new file mode 100644 index 00000000..c9837d39 --- /dev/null +++ b/agentex/src/utils/agent_api_key_authorization.py @@ -0,0 +1,29 @@ +from src.adapters.authorization.exceptions import AuthorizationError +from src.adapters.crud_store.exceptions import ItemDoesNotExist +from src.api.schemas.authorization_types import ( + AgentexResource, + AuthorizedOperationType, +) + +# Identifier-free 404 detail, reused by the denied-resource branch below and +# by the name routes when the row is absent — keeps both 404s indistinguishable. +API_KEY_NOT_FOUND_MESSAGE = "Agent api_key not found." + + +async def _check_api_key_or_collapse_to_404( + authorization, + api_key_id: str, + operation: AuthorizedOperationType, +) -> None: + """Check an api_key resource; collapse any denial to 404 to avoid leaking + cross-tenant existence. Mirrors ``_check_task_or_collapse_to_404``. + + TODO(AGX1-290): restore the 403/404 split once api_keys carry tenant scope. + """ + try: + await authorization.check( + resource=AgentexResource.api_key(api_key_id), + operation=operation, + ) + except AuthorizationError: + raise ItemDoesNotExist(API_KEY_NOT_FOUND_MESSAGE) from None diff --git a/agentex/src/utils/authorization_shortcuts.py b/agentex/src/utils/authorization_shortcuts.py index efa627d9..1175c275 100644 --- a/agentex/src/utils/authorization_shortcuts.py +++ b/agentex/src/utils/authorization_shortcuts.py @@ -16,6 +16,7 @@ from src.domain.repositories.task_repository import DTaskRepository from src.domain.repositories.task_state_repository import DTaskStateRepository from src.domain.services.authorization_service import DAuthorizationService +from src.utils.agent_api_key_authorization import _check_api_key_or_collapse_to_404 async def _get_parent_task_id( @@ -72,6 +73,12 @@ async def _ensure_authorized_id( raise ItemDoesNotExist( f"Item with id '{resource_id}' does not exist." ) from None + elif resource_type == AgentexResourceType.api_key: + # Collapse api_key denials to 404 so name/id probes can't + # distinguish "present in another tenant" from "absent". + await _check_api_key_or_collapse_to_404( + authorization, resource_id, operation + ) else: # For direct resources, check directly await authorization.check( @@ -157,7 +164,9 @@ async def _ensure_authorized_body_field( operation=operation, ) except AuthorizationError: - raise ItemDoesNotExist(f"Item with id '{field_value}' does not exist.") from None + raise ItemDoesNotExist( + f"Item with id '{field_value}' does not exist." + ) from None else: await authorization.check( resource=AgentexResource(type=resource_type, selector=field_value), diff --git a/agentex/tests/unit/api/test_agent_api_keys_authz.py b/agentex/tests/unit/api/test_agent_api_keys_authz.py new file mode 100644 index 00000000..3dd59e4b --- /dev/null +++ b/agentex/tests/unit/api/test_agent_api_keys_authz.py @@ -0,0 +1,381 @@ +"""AGX1-263 — agent_api_keys route migration to Spark AuthZ. + +Asserts the route layer issues correct ``check`` calls and collapses denials +to 404. Two-factor SpiceDB expansion is owned by spark-authz; not tested here. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest +from src.adapters.authorization.exceptions import AuthorizationError +from src.adapters.crud_store.exceptions import ItemDoesNotExist +from src.api.schemas.authorization_types import ( + AgentexResource, + AgentexResourceType, + AuthorizedOperationType, +) +from src.utils.agent_api_key_authorization import _check_api_key_or_collapse_to_404 +from src.utils.authorization_shortcuts import DAuthorizedId + + +def _dep_callable(annotation): + """Pull the inner FastAPI dependency function out of an ``Annotated[str, Depends(...)]``.""" + return annotation.__metadata__[0].dependency + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestCheckApiKeyOrCollapseTo404: + """Helper collapses every denial to 404 (no cross-tenant existence leak).""" + + async def test_allowed_check_returns_normally(self): + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await _check_api_key_or_collapse_to_404( + authorization, + "api-key-1", + AuthorizedOperationType.read, + ) + + authorization.check.assert_awaited_once() + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.api_key("api-key-1") + assert called_kwargs["operation"] == AuthorizedOperationType.read + + async def test_denied_collapses_to_not_found_regardless_of_existence(self): + """Denial surfaces as 404 regardless of existence.""" + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await _check_api_key_or_collapse_to_404( + authorization, + "api-key-1", + AuthorizedOperationType.delete, + ) + + async def test_uses_delete_operation_on_delete_routes(self): + """Helper forwards the operation verbatim.""" + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await _check_api_key_or_collapse_to_404( + authorization, + "api-key-2", + AuthorizedOperationType.delete, + ) + + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["operation"] == AuthorizedOperationType.delete + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestDAuthorizedIdApiKeyWrap: + """``DAuthorizedId(api_key, ...)`` routes through the collapse wrap → 404 on denial.""" + + async def test_api_key_id_routes_through_wrap_on_denial(self): + annotation = DAuthorizedId( + AgentexResourceType.api_key, + AuthorizedOperationType.read, + param_name="id", + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + event_repository = MagicMock() + state_repository = MagicMock() + + message_repository = MagicMock() + with pytest.raises(ItemDoesNotExist): + await dep( + authorization, + event_repository, + state_repository, + message_repository, + "api-key-7", + ) + + async def test_api_key_id_returns_resource_id_when_allowed(self): + annotation = DAuthorizedId( + AgentexResourceType.api_key, + AuthorizedOperationType.read, + param_name="id", + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + result = await dep( + authorization, MagicMock(), MagicMock(), MagicMock(), "api-key-9" + ) + + assert result == "api-key-9" + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.api_key("api-key-9") + + async def test_api_key_delete_op_propagated_to_check(self): + """Delete op is forwarded to ``authorization.check``.""" + annotation = DAuthorizedId( + AgentexResourceType.api_key, + AuthorizedOperationType.delete, + param_name="id", + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "api-key-del") + + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["operation"] == AuthorizedOperationType.delete + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestNameRouteCollapse: + """Name-route handlers call the collapse helper inline → 404 on denial.""" + + async def test_get_by_name_handler_collapses_denial_to_404(self): + from src.api.routes.agent_api_keys import get_agent_api_key_by_name + + agent_use_case = MagicMock() + agent_use_case.get = AsyncMock(return_value=MagicMock(id="agent-1")) + api_key_use_case = MagicMock() + api_key_use_case.get_by_agent_id_and_name = AsyncMock( + return_value=MagicMock(id="api-key-named", name="prod-key") + ) + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await get_agent_api_key_by_name( + name="prod-key", + agent_api_key_use_case=api_key_use_case, + agent_use_case=agent_use_case, + authorization_service=authorization, + agent_id="agent-1", + agent_name=None, + ) + + # The lookup happens BEFORE the authz check — name → id is resolved, + # then the wrap intercepts the denial. + api_key_use_case.get_by_agent_id_and_name.assert_awaited_once() + authorization.check.assert_awaited_once() + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.api_key("api-key-named") + + async def test_delete_by_name_handler_collapses_denial_to_404(self): + from src.api.routes.agent_api_keys import delete_agent_api_key_by_name + + agent_use_case = MagicMock() + agent_use_case.get = AsyncMock(return_value=MagicMock(id="agent-1")) + api_key_use_case = MagicMock() + api_key_use_case.get_by_agent_id_and_name = AsyncMock( + return_value=MagicMock(id="api-key-named") + ) + api_key_use_case.delete_by_agent_id_and_key_name = AsyncMock() + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + authorization.principal_context = MagicMock(account_id="acct-1") + + with pytest.raises(ItemDoesNotExist): + await delete_agent_api_key_by_name( + api_key_name="prod-key", + agent_api_key_use_case=api_key_use_case, + agent_use_case=agent_use_case, + authorization_service=authorization, + agent_id="agent-1", + agent_name=None, + ) + + # Crucially: the delete is NOT invoked when the check fails. + api_key_use_case.delete_by_agent_id_and_key_name.assert_not_called() + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["operation"] == AuthorizedOperationType.delete + + async def test_absent_and_denied_404_bodies_are_identical(self): + """Absent-row and denied-row 404 bodies must be byte-for-byte identical.""" + from src.api.routes.agent_api_keys import get_agent_api_key_by_name + from src.utils.agent_api_key_authorization import API_KEY_NOT_FOUND_MESSAGE + + agent_use_case = MagicMock() + agent_use_case.get = AsyncMock(return_value=MagicMock(id="agent-1")) + + # Path A: row absent. + absent_use_case = MagicMock() + absent_use_case.get_by_agent_id_and_name = AsyncMock(return_value=None) + with pytest.raises(ItemDoesNotExist) as absent_exc: + await get_agent_api_key_by_name( + name="prod-key", + agent_api_key_use_case=absent_use_case, + agent_use_case=agent_use_case, + authorization_service=MagicMock(), + agent_id="agent-1", + agent_name=None, + ) + + # Path B: row present, authz denied. + denied_use_case = MagicMock() + denied_use_case.get_by_agent_id_and_name = AsyncMock( + return_value=MagicMock(id="api-key-named") + ) + denied_authz = MagicMock() + denied_authz.check = AsyncMock(side_effect=AuthorizationError("denied")) + with pytest.raises(ItemDoesNotExist) as denied_exc: + await get_agent_api_key_by_name( + name="prod-key", + agent_api_key_use_case=denied_use_case, + agent_use_case=agent_use_case, + authorization_service=denied_authz, + agent_id="agent-1", + agent_name=None, + ) + + assert ( + str(absent_exc.value) == str(denied_exc.value) == API_KEY_NOT_FOUND_MESSAGE + ) + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestListFiltering: + """List filters to api_keys the caller has ``read`` on.""" + + async def test_authorized_ids_pushed_into_use_case(self): + """Route forwards ``authorized_api_key_ids`` as ``id=`` so the repo + filters at the SQL layer (correct pagination).""" + from src.api.routes.agent_api_keys import list_agent_api_keys + + agent_use_case = MagicMock() + agent_use_case.get = AsyncMock(return_value=MagicMock(id="agent-1")) + api_key_use_case = MagicMock() + api_key_use_case.list = AsyncMock(return_value=[]) + + await list_agent_api_keys( + agent_api_key_use_case=api_key_use_case, + agent_use_case=agent_use_case, + authorized_api_key_ids=["api-key-a", "api-key-c"], + agent_id="agent-1", + agent_name=None, + limit=50, + page_number=1, + ) + + api_key_use_case.list.assert_awaited_once_with( + agent_id="agent-1", + limit=50, + page_number=1, + id=["api-key-a", "api-key-c"], + ) + + async def test_none_authorized_ids_passes_through(self): + """``None`` (bypass) must pass through as ``id=None`` (no filter).""" + from src.api.routes.agent_api_keys import list_agent_api_keys + + agent_use_case = MagicMock() + agent_use_case.get = AsyncMock(return_value=MagicMock(id="agent-1")) + api_key_use_case = MagicMock() + api_key_use_case.list = AsyncMock(return_value=[]) + + await list_agent_api_keys( + agent_api_key_use_case=api_key_use_case, + agent_use_case=agent_use_case, + authorized_api_key_ids=None, + agent_id="agent-1", + agent_name=None, + limit=50, + page_number=1, + ) + + api_key_use_case.list.assert_awaited_once_with( + agent_id="agent-1", + limit=50, + page_number=1, + id=None, + ) + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestCreateParentAgentCheck: + """``create_api_key`` gates on parent ``agent.update`` (no api_key row yet).""" + + async def test_create_checks_parent_agent_update(self): + from src.api.routes.agent_api_keys import create_api_key + from src.api.schemas.agent_api_keys import CreateAPIKeyRequest + from src.domain.entities.agent_api_keys import AgentAPIKeyType + + agent_use_case = MagicMock() + agent_use_case.get = AsyncMock(return_value=MagicMock(id="agent-1")) + api_key_use_case = MagicMock() + api_key_use_case.get_by_agent_id_and_name = AsyncMock(return_value=None) + from datetime import datetime + + created_entity = MagicMock() + created_entity.id = "new-api-key" + created_entity.agent_id = "agent-1" + created_entity.created_at = datetime(2026, 1, 1) + created_entity.name = "prod-key" + created_entity.api_key_type = AgentAPIKeyType.EXTERNAL + api_key_use_case.create = AsyncMock(return_value=created_entity) + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + authorization.principal_context = MagicMock(account_id="acct-1") + + request = CreateAPIKeyRequest( + agent_id="agent-1", + agent_name=None, + name="prod-key", + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="secret-key-value", + ) + + await create_api_key( + request=request, + agent_api_key_use_case=api_key_use_case, + agent_use_case=agent_use_case, + authorization_service=authorization, + ) + + authorization.check.assert_awaited_once() + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.agent("agent-1") + assert called_kwargs["operation"] == AuthorizedOperationType.update + + async def test_create_denied_on_parent_agent_propagates_403(self): + """Create denial surfaces as 403 — no api_key exists yet, no leak possible.""" + from src.api.routes.agent_api_keys import create_api_key + from src.api.schemas.agent_api_keys import CreateAPIKeyRequest + from src.domain.entities.agent_api_keys import AgentAPIKeyType + + agent_use_case = MagicMock() + agent_use_case.get = AsyncMock(return_value=MagicMock(id="agent-1")) + api_key_use_case = MagicMock() + api_key_use_case.create = AsyncMock() + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + request = CreateAPIKeyRequest( + agent_id="agent-1", + agent_name=None, + name="prod-key", + api_key_type=AgentAPIKeyType.EXTERNAL, + api_key="x", + ) + + with pytest.raises(AuthorizationError): + await create_api_key( + request=request, + agent_api_key_use_case=api_key_use_case, + agent_use_case=agent_use_case, + authorization_service=authorization, + ) + # Create is NOT invoked when parent check fails. + api_key_use_case.create.assert_not_called()