Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion agentex/src/utils/authorization_shortcuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from fastapi import Depends, Path, Query, Request

from src.adapters.authorization.exceptions import AuthorizationError
from src.adapters.crud_store.exceptions import ItemDoesNotExist
from src.api.schemas.authorization_types import (
AgentexResource,
AgentexResourceType,
Expand All @@ -15,7 +17,10 @@
from src.domain.repositories.task_message_repository import DTaskMessageRepository
from src.domain.repositories.task_repository import DTaskRepository
from src.domain.repositories.task_state_repository import DTaskStateRepository
from src.domain.services.authorization_service import DAuthorizationService
from src.domain.services.authorization_service import (
AuthorizationService,
DAuthorizationService,
)
from src.utils.agent_api_key_authorization import _check_api_key_or_collapse_to_404
from src.utils.task_authorization import check_task_or_collapse_to_404

Expand All @@ -39,6 +44,29 @@ async def _get_parent_task_id(
return resource.task_id


async def _check_agent_or_collapse_to_404(
authorization: AuthorizationService,
agent_id: str,
operation: AuthorizedOperationType,
) -> None:
"""Check an agent resource; collapse any denial to 404.

Collapsing a denied check into 404 stops callers from distinguishing 403
(exists, no access) from 404 (absent) and thereby probing whether an agent
exists in another tenant. Mirrors the task/api_key collapse helpers.

TODO: fold this into a single generic collapse helper, and restore the
403/404 split once agents carry tenant scope at the data layer.
"""
try:
await authorization.check(
resource=AgentexResource.agent(agent_id),
operation=operation,
)
except AuthorizationError:
raise ItemDoesNotExist(f"Item with id '{agent_id}' does not exist.") from None


def DAuthorizedId(
resource_type: AgentexResourceType | TaskChildResourceType,
operation: AuthorizedOperationType = AuthorizedOperationType.read,
Expand Down Expand Up @@ -74,6 +102,9 @@ async def _ensure_authorized_id(
await _check_api_key_or_collapse_to_404(
authorization, resource_id, operation
)
elif resource_type == AgentexResourceType.agent:
# Collapse agent denials to 404 for the same discoverability reason.
await _check_agent_or_collapse_to_404(authorization, resource_id, operation)
else:
await authorization.check(
resource=AgentexResource(type=resource_type, selector=resource_id),
Expand Down Expand Up @@ -116,6 +147,8 @@ async def _ensure_authorized_query(
await check_task_or_collapse_to_404(authorization, task_id, operation)
elif resource_type == AgentexResourceType.task:
await check_task_or_collapse_to_404(authorization, resource_id, operation)
elif resource_type == AgentexResourceType.agent:
await _check_agent_or_collapse_to_404(authorization, resource_id, operation)
else:
await authorization.check(
resource=AgentexResource(type=resource_type, selector=resource_id),
Expand Down Expand Up @@ -200,6 +233,9 @@ async def _ensure_authorized_name(
# "present in another tenant" from "absent" (tasks.name is globally
# unique — any 403 leak here probes the whole system, not a tenant).
await check_task_or_collapse_to_404(authorization, resource.id, operation)
elif resource_type == AgentexResourceType.agent:
# Agents: same collapse, on the id resolved from the name above.
await _check_agent_or_collapse_to_404(authorization, resource.id, operation)
else:
await authorization.check(
resource=AgentexResource(type=resource_type, selector=resource.id),
Expand Down
263 changes: 263 additions & 0 deletions agentex/tests/unit/api/test_agents_authz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
"""Agent direct-route authorization: denied reads/deletes collapse to 404.

Asserts the route layer issues the right ``check`` calls and collapses a denial
to 404 (so callers can't probe cross-tenant existence), and that list filtering
forwards the authorized-id set to the use case.
"""

from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock

import pytest
from src.adapters.authorization.exceptions import AuthorizationError
from src.adapters.crud_store.exceptions import ItemDoesNotExist
from src.api.schemas.authorization_types import (
AgentexResource,
AgentexResourceType,
AuthorizedOperationType,
)
from src.utils.authorization_shortcuts import (
DAuthorizedId,
DAuthorizedName,
DAuthorizedQuery,
_check_agent_or_collapse_to_404,
)


def _dep_callable(annotation):
"""Pull the inner FastAPI dependency function out of an ``Annotated[str, Depends(...)]``."""
return annotation.__metadata__[0].dependency


@pytest.mark.unit
@pytest.mark.asyncio
class TestCheckAgentOrCollapseTo404:
"""Helper collapses every denial to 404 (no cross-tenant existence leak)."""

async def test_allowed_check_returns_normally(self):
authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

await _check_agent_or_collapse_to_404(
authorization,
"agent-1",
AuthorizedOperationType.read,
)

authorization.check.assert_awaited_once()
called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["resource"] == AgentexResource.agent("agent-1")
assert called_kwargs["operation"] == AuthorizedOperationType.read

async def test_denied_collapses_to_not_found(self):
authorization = MagicMock()
authorization.check = AsyncMock(side_effect=AuthorizationError("denied"))

with pytest.raises(ItemDoesNotExist):
await _check_agent_or_collapse_to_404(
authorization,
"agent-1",
AuthorizedOperationType.delete,
)

async def test_operation_forwarded_verbatim(self):
authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

await _check_agent_or_collapse_to_404(
authorization,
"agent-2",
AuthorizedOperationType.delete,
)

called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["operation"] == AuthorizedOperationType.delete


@pytest.mark.unit
@pytest.mark.asyncio
class TestDAuthorizedIdAgentWrap:
"""``DAuthorizedId(agent, ...)`` routes through the collapse wrap → 404 on denial."""

async def test_agent_id_returns_resource_id_when_allowed(self):
annotation = DAuthorizedId(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

result = await dep(
authorization, MagicMock(), MagicMock(), MagicMock(), "agent-9"
)

assert result == "agent-9"
called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["resource"] == AgentexResource.agent("agent-9")
assert called_kwargs["operation"] == AuthorizedOperationType.read

async def test_agent_id_routes_through_wrap_on_denial(self):
annotation = DAuthorizedId(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

authorization = MagicMock()
authorization.check = AsyncMock(side_effect=AuthorizationError("denied"))

with pytest.raises(ItemDoesNotExist):
await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "agent-7")

async def test_agent_delete_op_propagated_to_check(self):
annotation = DAuthorizedId(
AgentexResourceType.agent, AuthorizedOperationType.delete
)
dep = _dep_callable(annotation)

authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "agent-del")

called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["operation"] == AuthorizedOperationType.delete


@pytest.mark.unit
@pytest.mark.asyncio
class TestDAuthorizedNameAgentWrap:
"""``DAuthorizedName(agent, ...)``: lookup-then-collapse on the resolved id."""

async def test_present_but_denied_collapses_to_404(self):
annotation = DAuthorizedName(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

agent_repository = MagicMock()
agent_repository.get = AsyncMock(return_value=MagicMock(id="agent-resolved"))
task_repository = MagicMock()
authorization = MagicMock()
authorization.check = AsyncMock(side_effect=AuthorizationError("denied"))

with pytest.raises(ItemDoesNotExist):
await dep(authorization, agent_repository, task_repository, "prod-agent")

# Name was resolved to an id BEFORE the authz check intercepted the denial.
agent_repository.get.assert_awaited_once_with(name="prod-agent")
called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["resource"] == AgentexResource.agent("agent-resolved")

async def test_absent_name_surfaces_native_404_without_checking(self):
annotation = DAuthorizedName(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

agent_repository = MagicMock()
agent_repository.get = AsyncMock(side_effect=ItemDoesNotExist("absent"))
task_repository = MagicMock()
authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

with pytest.raises(ItemDoesNotExist):
await dep(authorization, agent_repository, task_repository, "missing-agent")

# Truly-absent name → repo's native 404, and no authz check is attempted.
authorization.check.assert_not_awaited()


@pytest.mark.unit
@pytest.mark.asyncio
class TestDAuthorizedQueryAgentWrap:
"""``DAuthorizedQuery(agent, ...)`` also collapses denied checks to 404."""

async def test_agent_query_returns_resource_id_when_allowed(self):
annotation = DAuthorizedQuery(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

result = await dep(
authorization, MagicMock(), MagicMock(), MagicMock(), "agent-query"
)

assert result == "agent-query"
called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["resource"] == AgentexResource.agent("agent-query")
assert called_kwargs["operation"] == AuthorizedOperationType.read

async def test_agent_query_routes_through_wrap_on_denial(self):
annotation = DAuthorizedQuery(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

authorization = MagicMock()
authorization.check = AsyncMock(side_effect=AuthorizationError("denied"))

with pytest.raises(ItemDoesNotExist):
await dep(
authorization, MagicMock(), MagicMock(), MagicMock(), "agent-query"
)


@pytest.mark.unit
@pytest.mark.asyncio
class TestListFiltering:
"""List forwards the authorized-id set to the use case (SQL-layer filtering)."""

async def test_authorized_ids_pushed_into_use_case(self):
from src.api.routes.agents import list_agents

agents_use_case = MagicMock()
agents_use_case.list = AsyncMock(return_value=[])

await list_agents(
agents_use_case=agents_use_case,
_authorized_ids=["agent-a", "agent-c"],
task_id=None,
limit=50,
page_number=1,
order_by=None,
order_direction="desc",
)

agents_use_case.list.assert_awaited_once_with(
task_id=None,
limit=50,
page_number=1,
order_by=None,
order_direction="desc",
id=["agent-a", "agent-c"],
)

async def test_none_authorized_ids_passes_through_unfiltered(self):
from src.api.routes.agents import list_agents

agents_use_case = MagicMock()
agents_use_case.list = AsyncMock(return_value=[])

await list_agents(
agents_use_case=agents_use_case,
_authorized_ids=None,
task_id=None,
limit=50,
page_number=1,
order_by=None,
order_direction="desc",
)

# ``None`` (authz bypass / disabled) means no id filter is applied.
agents_use_case.list.assert_awaited_once_with(
task_id=None,
limit=50,
page_number=1,
order_by=None,
order_direction="desc",
)
16 changes: 11 additions & 5 deletions agentex/tests/unit/api/test_schedules_authz.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ class TestCreateParentAgentCheck:
"""``create_schedule`` is the only route where no schedule resource exists
yet, so the authorization service cannot transitively gate on it. The
route's ``agent_id`` guard MUST check ``agent.update`` on the parent, and a
denial propagates as 403 (no schedule whose existence could leak)."""
denial collapses to 404 — like every other agent check — so the schedules
endpoint can't be used to probe cross-tenant agent existence."""

@staticmethod
def _agent_id_dep():
Expand All @@ -215,16 +216,21 @@ async def test_create_checks_parent_agent_update(self):
assert called_kwargs["resource"] == AgentexResource.agent("agent-1")
assert called_kwargs["operation"] == AuthorizedOperationType.update

async def test_create_denied_propagates_403_not_collapsed(self):
async def test_create_denied_collapses_to_404(self):
dep = self._agent_id_dep()

authorization = MagicMock()
authorization.check = AsyncMock(side_effect=AuthorizationError("denied"))

# Direct agent denial bubbles out as AuthorizationError (→ 403), NOT 404.
with pytest.raises(AuthorizationError):
# Parent-agent denial collapses to 404 so creating a schedule under an
# agent in another tenant can't reveal that the agent exists.
with pytest.raises(ItemDoesNotExist):
await dep(
authorization, MagicMock(), MagicMock(), MagicMock(), resource_id="agent-1"
authorization,
MagicMock(),
MagicMock(),
MagicMock(),
resource_id="agent-1",
)


Expand Down
Loading
Loading