From f0c7c1b1625cf2cfc6f2c39831afa82206205291 Mon Sep 17 00:00:00 2001 From: Asher Fink Date: Tue, 2 Jun 2026 22:54:37 -0400 Subject: [PATCH] feat(agents): collapse denied agent route access to 404 instead of 403 Direct agent-by-id and agent-by-name routes now return 404 instead of 403 when the caller isn't authorized for the agent, matching the existing behavior for tasks and api keys. A 403 on a specific id or name reveals that the agent exists, so collapsing denials to 404 stops callers from probing for agents in other tenants. The name routes resolve the name to an id first, so a genuinely missing name still surfaces the normal repository 404. Ticket: AGX1-242 --- agentex/src/utils/authorization_shortcuts.py | 38 ++- agentex/tests/unit/api/test_agents_authz.py | 263 ++++++++++++++++++ .../tests/unit/api/test_schedules_authz.py | 16 +- agentex/tests/unit/api/test_tasks_authz.py | 10 +- 4 files changed, 317 insertions(+), 10 deletions(-) create mode 100644 agentex/tests/unit/api/test_agents_authz.py diff --git a/agentex/src/utils/authorization_shortcuts.py b/agentex/src/utils/authorization_shortcuts.py index 395e6dc1..86768cbd 100644 --- a/agentex/src/utils/authorization_shortcuts.py +++ b/agentex/src/utils/authorization_shortcuts.py @@ -2,6 +2,8 @@ from fastapi import Depends, Path, Query, Request +from src.adapters.authorization.exceptions import AuthorizationError +from src.adapters.crud_store.exceptions import ItemDoesNotExist from src.api.schemas.authorization_types import ( AgentexResource, AgentexResourceType, @@ -15,7 +17,10 @@ from src.domain.repositories.task_message_repository import DTaskMessageRepository from src.domain.repositories.task_repository import DTaskRepository from src.domain.repositories.task_state_repository import DTaskStateRepository -from src.domain.services.authorization_service import DAuthorizationService +from src.domain.services.authorization_service import ( + AuthorizationService, + DAuthorizationService, +) from src.utils.agent_api_key_authorization import _check_api_key_or_collapse_to_404 from src.utils.task_authorization import check_task_or_collapse_to_404 @@ -39,6 +44,29 @@ async def _get_parent_task_id( return resource.task_id +async def _check_agent_or_collapse_to_404( + authorization: AuthorizationService, + agent_id: str, + operation: AuthorizedOperationType, +) -> None: + """Check an agent resource; collapse any denial to 404. + + Collapsing a denied check into 404 stops callers from distinguishing 403 + (exists, no access) from 404 (absent) and thereby probing whether an agent + exists in another tenant. Mirrors the task/api_key collapse helpers. + + TODO: fold this into a single generic collapse helper, and restore the + 403/404 split once agents carry tenant scope at the data layer. + """ + try: + await authorization.check( + resource=AgentexResource.agent(agent_id), + operation=operation, + ) + except AuthorizationError: + raise ItemDoesNotExist(f"Item with id '{agent_id}' does not exist.") from None + + def DAuthorizedId( resource_type: AgentexResourceType | TaskChildResourceType, operation: AuthorizedOperationType = AuthorizedOperationType.read, @@ -74,6 +102,9 @@ async def _ensure_authorized_id( await _check_api_key_or_collapse_to_404( authorization, resource_id, operation ) + elif resource_type == AgentexResourceType.agent: + # Collapse agent denials to 404 for the same discoverability reason. + await _check_agent_or_collapse_to_404(authorization, resource_id, operation) else: await authorization.check( resource=AgentexResource(type=resource_type, selector=resource_id), @@ -116,6 +147,8 @@ async def _ensure_authorized_query( await check_task_or_collapse_to_404(authorization, task_id, operation) elif resource_type == AgentexResourceType.task: await check_task_or_collapse_to_404(authorization, resource_id, operation) + elif resource_type == AgentexResourceType.agent: + await _check_agent_or_collapse_to_404(authorization, resource_id, operation) else: await authorization.check( resource=AgentexResource(type=resource_type, selector=resource_id), @@ -200,6 +233,9 @@ async def _ensure_authorized_name( # "present in another tenant" from "absent" (tasks.name is globally # unique — any 403 leak here probes the whole system, not a tenant). await check_task_or_collapse_to_404(authorization, resource.id, operation) + elif resource_type == AgentexResourceType.agent: + # Agents: same collapse, on the id resolved from the name above. + await _check_agent_or_collapse_to_404(authorization, resource.id, operation) else: await authorization.check( resource=AgentexResource(type=resource_type, selector=resource.id), diff --git a/agentex/tests/unit/api/test_agents_authz.py b/agentex/tests/unit/api/test_agents_authz.py new file mode 100644 index 00000000..cd6cd4e0 --- /dev/null +++ b/agentex/tests/unit/api/test_agents_authz.py @@ -0,0 +1,263 @@ +"""Agent direct-route authorization: denied reads/deletes collapse to 404. + +Asserts the route layer issues the right ``check`` calls and collapses a denial +to 404 (so callers can't probe cross-tenant existence), and that list filtering +forwards the authorized-id set to the use case. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest +from src.adapters.authorization.exceptions import AuthorizationError +from src.adapters.crud_store.exceptions import ItemDoesNotExist +from src.api.schemas.authorization_types import ( + AgentexResource, + AgentexResourceType, + AuthorizedOperationType, +) +from src.utils.authorization_shortcuts import ( + DAuthorizedId, + DAuthorizedName, + DAuthorizedQuery, + _check_agent_or_collapse_to_404, +) + + +def _dep_callable(annotation): + """Pull the inner FastAPI dependency function out of an ``Annotated[str, Depends(...)]``.""" + return annotation.__metadata__[0].dependency + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestCheckAgentOrCollapseTo404: + """Helper collapses every denial to 404 (no cross-tenant existence leak).""" + + async def test_allowed_check_returns_normally(self): + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await _check_agent_or_collapse_to_404( + authorization, + "agent-1", + AuthorizedOperationType.read, + ) + + authorization.check.assert_awaited_once() + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.agent("agent-1") + assert called_kwargs["operation"] == AuthorizedOperationType.read + + async def test_denied_collapses_to_not_found(self): + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await _check_agent_or_collapse_to_404( + authorization, + "agent-1", + AuthorizedOperationType.delete, + ) + + async def test_operation_forwarded_verbatim(self): + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await _check_agent_or_collapse_to_404( + authorization, + "agent-2", + AuthorizedOperationType.delete, + ) + + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["operation"] == AuthorizedOperationType.delete + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestDAuthorizedIdAgentWrap: + """``DAuthorizedId(agent, ...)`` routes through the collapse wrap → 404 on denial.""" + + async def test_agent_id_returns_resource_id_when_allowed(self): + annotation = DAuthorizedId( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + result = await dep( + authorization, MagicMock(), MagicMock(), MagicMock(), "agent-9" + ) + + assert result == "agent-9" + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.agent("agent-9") + assert called_kwargs["operation"] == AuthorizedOperationType.read + + async def test_agent_id_routes_through_wrap_on_denial(self): + annotation = DAuthorizedId( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "agent-7") + + async def test_agent_delete_op_propagated_to_check(self): + annotation = DAuthorizedId( + AgentexResourceType.agent, AuthorizedOperationType.delete + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "agent-del") + + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["operation"] == AuthorizedOperationType.delete + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestDAuthorizedNameAgentWrap: + """``DAuthorizedName(agent, ...)``: lookup-then-collapse on the resolved id.""" + + async def test_present_but_denied_collapses_to_404(self): + annotation = DAuthorizedName( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + agent_repository = MagicMock() + agent_repository.get = AsyncMock(return_value=MagicMock(id="agent-resolved")) + task_repository = MagicMock() + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await dep(authorization, agent_repository, task_repository, "prod-agent") + + # Name was resolved to an id BEFORE the authz check intercepted the denial. + agent_repository.get.assert_awaited_once_with(name="prod-agent") + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.agent("agent-resolved") + + async def test_absent_name_surfaces_native_404_without_checking(self): + annotation = DAuthorizedName( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + agent_repository = MagicMock() + agent_repository.get = AsyncMock(side_effect=ItemDoesNotExist("absent")) + task_repository = MagicMock() + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + with pytest.raises(ItemDoesNotExist): + await dep(authorization, agent_repository, task_repository, "missing-agent") + + # Truly-absent name → repo's native 404, and no authz check is attempted. + authorization.check.assert_not_awaited() + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestDAuthorizedQueryAgentWrap: + """``DAuthorizedQuery(agent, ...)`` also collapses denied checks to 404.""" + + async def test_agent_query_returns_resource_id_when_allowed(self): + annotation = DAuthorizedQuery( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + result = await dep( + authorization, MagicMock(), MagicMock(), MagicMock(), "agent-query" + ) + + assert result == "agent-query" + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.agent("agent-query") + assert called_kwargs["operation"] == AuthorizedOperationType.read + + async def test_agent_query_routes_through_wrap_on_denial(self): + annotation = DAuthorizedQuery( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await dep( + authorization, MagicMock(), MagicMock(), MagicMock(), "agent-query" + ) + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestListFiltering: + """List forwards the authorized-id set to the use case (SQL-layer filtering).""" + + async def test_authorized_ids_pushed_into_use_case(self): + from src.api.routes.agents import list_agents + + agents_use_case = MagicMock() + agents_use_case.list = AsyncMock(return_value=[]) + + await list_agents( + agents_use_case=agents_use_case, + _authorized_ids=["agent-a", "agent-c"], + task_id=None, + limit=50, + page_number=1, + order_by=None, + order_direction="desc", + ) + + agents_use_case.list.assert_awaited_once_with( + task_id=None, + limit=50, + page_number=1, + order_by=None, + order_direction="desc", + id=["agent-a", "agent-c"], + ) + + async def test_none_authorized_ids_passes_through_unfiltered(self): + from src.api.routes.agents import list_agents + + agents_use_case = MagicMock() + agents_use_case.list = AsyncMock(return_value=[]) + + await list_agents( + agents_use_case=agents_use_case, + _authorized_ids=None, + task_id=None, + limit=50, + page_number=1, + order_by=None, + order_direction="desc", + ) + + # ``None`` (authz bypass / disabled) means no id filter is applied. + agents_use_case.list.assert_awaited_once_with( + task_id=None, + limit=50, + page_number=1, + order_by=None, + order_direction="desc", + ) diff --git a/agentex/tests/unit/api/test_schedules_authz.py b/agentex/tests/unit/api/test_schedules_authz.py index 3d52e8d9..87239ca0 100644 --- a/agentex/tests/unit/api/test_schedules_authz.py +++ b/agentex/tests/unit/api/test_schedules_authz.py @@ -190,7 +190,8 @@ class TestCreateParentAgentCheck: """``create_schedule`` is the only route where no schedule resource exists yet, so the authorization service cannot transitively gate on it. The route's ``agent_id`` guard MUST check ``agent.update`` on the parent, and a - denial propagates as 403 (no schedule whose existence could leak).""" + denial collapses to 404 — like every other agent check — so the schedules + endpoint can't be used to probe cross-tenant agent existence.""" @staticmethod def _agent_id_dep(): @@ -215,16 +216,21 @@ async def test_create_checks_parent_agent_update(self): assert called_kwargs["resource"] == AgentexResource.agent("agent-1") assert called_kwargs["operation"] == AuthorizedOperationType.update - async def test_create_denied_propagates_403_not_collapsed(self): + async def test_create_denied_collapses_to_404(self): dep = self._agent_id_dep() authorization = MagicMock() authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) - # Direct agent denial bubbles out as AuthorizationError (→ 403), NOT 404. - with pytest.raises(AuthorizationError): + # Parent-agent denial collapses to 404 so creating a schedule under an + # agent in another tenant can't reveal that the agent exists. + with pytest.raises(ItemDoesNotExist): await dep( - authorization, MagicMock(), MagicMock(), MagicMock(), resource_id="agent-1" + authorization, + MagicMock(), + MagicMock(), + MagicMock(), + resource_id="agent-1", ) diff --git a/agentex/tests/unit/api/test_tasks_authz.py b/agentex/tests/unit/api/test_tasks_authz.py index 7f497734..0a4d0ccb 100644 --- a/agentex/tests/unit/api/test_tasks_authz.py +++ b/agentex/tests/unit/api/test_tasks_authz.py @@ -470,9 +470,11 @@ async def test_task_name_returns_resource_name_when_allowed(self): # The selector is the resolved row id, not the user-provided name. assert called_kwargs["resource"] == AgentexResource.task("task-allow") - async def test_non_task_name_skips_wrap(self): - """Agent FGAC is out of scope for AGX1-264 — the wrap must NOT extend - to agent name-routes, which keep the existing 403-on-deny behavior.""" + async def test_agent_name_collapses_to_404(self): + """Agent name-routes collapse a denial to 404 through their own wrap, + like tasks. Full agent coverage lives in test_agents_authz.py; here we + only guard that the shared name dependency routes agents through a + collapse wrap rather than the plain check that surfaces a denial as 403.""" annotation = DAuthorizedName( AgentexResourceType.agent, AuthorizedOperationType.read ) @@ -484,5 +486,5 @@ async def test_non_task_name_skips_wrap(self): agent_repository.get = AsyncMock(return_value=MagicMock(id="agent-1")) task_repository = MagicMock() - with pytest.raises(AuthorizationError): + with pytest.raises(ItemDoesNotExist): await dep(authorization, agent_repository, task_repository, "agent-name")