Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions agentex/src/api/routes/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,28 @@
router = APIRouter(prefix="/agents", tags=["Agents"])


def _has_resolvable_creator(principal_context) -> bool:
"""Whether a creator identity (user or service account) is present.

``/agents/register`` is whitelisted, so deployed pods self-register on
startup with no principal context (``None``). The agent already exists and
is owned from build time (``register-build`` runs before deploy-time
register), so the create authorization check and ownership grant don't
apply on that path -- and forwarding a ``None`` principal to agentex-auth
raises 422, crash-looping the pod. Skip the check/grant when no creator is
resolvable; enforce normally for an authenticated caller.
"""
if isinstance(principal_context, dict):
return bool(
principal_context.get("user_id")
or principal_context.get("service_account_id")
)
return bool(
getattr(principal_context, "user_id", None)
or getattr(principal_context, "service_account_id", None)
)
Comment on lines +64 to +72

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 getattr branch is never exercised with a truthy result in tests

_has_resolvable_creator has two branches: a dict path (lines 64–68) and a getattr path (lines 69–72). The tests cover None → getattr → False and {"user_id": "u"} → dict → True, but there is no test exercising getattr(..., "user_id") → True (i.e., a non-dict object with a user_id attribute). Since AgentexAuthPrincipalContext = Any and the production path may return a Pydantic model or another non-dict object from the auth gateway, the branch that would actually enforce authz for service-account-authenticated callers in production is untested. A test passing a MagicMock with user_id = "sa-1" set would close this gap.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/api/routes/agents.py
Line: 64-72

Comment:
**`getattr` branch is never exercised with a truthy result in tests**

`_has_resolvable_creator` has two branches: a `dict` path (lines 64–68) and a `getattr` path (lines 69–72). The tests cover `None → getattr → False` and `{"user_id": "u"} → dict → True`, but there is no test exercising `getattr(..., "user_id") → True` (i.e., a non-dict object with a `user_id` attribute). Since `AgentexAuthPrincipalContext = Any` and the production path may return a Pydantic model or another non-dict object from the auth gateway, the branch that would actually enforce authz for service-account-authenticated callers in production is untested. A test passing a `MagicMock` with `user_id = "sa-1"` set would close this gap.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Fix in Cursor Fix in Claude Code Fix in Codex



@router.get(
"/{agent_id}",
summary="Get Agent by ID",
Expand Down Expand Up @@ -173,10 +195,14 @@ async def register_agent(
If agent_id is not provided, the system will look for an existing agent by name and update it,
or create a new one if it doesn't exist.
"""
await authorization_service.check(
AgentexResource.agent("*"),
AuthorizedOperationType.create,
enforce_ownership = _has_resolvable_creator(
authorization_service.principal_context
)
if enforce_ownership:
await authorization_service.check(
AgentexResource.agent("*"),
AuthorizedOperationType.create,
)
logger.info(f"Registering agent: {request}")
try:
agent_entity = await agents_use_case.register_agent(
Expand All @@ -188,9 +214,10 @@ async def register_agent(
registration_metadata=request.registration_metadata,
agent_input_type=request.agent_input_type,
)
await authorization_service.grant(
AgentexResource.agent(agent_entity.id),
)
if enforce_ownership:
await authorization_service.grant(
AgentexResource.agent(agent_entity.id),
)
response_fields = agent_entity.model_dump()
existing_key = await api_keys_use_case.get_internal_api_key_by_agent_id(
agent_id=agent_entity.id
Expand Down
23 changes: 8 additions & 15 deletions agentex/tests/integration/api/agents/test_agents_auth_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ async def test_agent_check(
"src.utils.http_request_handler.HttpRequestHandler.post_with_error_handling",
side_effect=_mock_post_with_error_handling,
)
async def test_agent_create_and_delete_gate_and_write_legacy_authz(
async def test_agent_register_skips_authz_and_delete_gates_legacy_authz(
self,
post_with_error_handling_mock,
is_enabled_mock,
Expand All @@ -214,20 +214,13 @@ def _payloads(path: str):
assert response.status_code == 200
agent = response.json()

create_checks = _payloads("/v1/authz/check")
assert len(create_checks) == 1
assert create_checks[0]["resource"]["type"] == AgentexResourceType.agent.value
assert create_checks[0]["resource"]["selector"] == "*"
assert create_checks[0]["operation"] == "create"

agent_grants = [
payload
for payload in _payloads("/v1/authz/grant")
if payload["resource"]["type"] == AgentexResourceType.agent.value
]
assert len(agent_grants) == 1
assert agent_grants[0]["resource"]["selector"] == agent["id"]
assert agent_grants[0]["operation"] == "create"
# /agents/register is whitelisted: deployed pods self-register on
# startup without a principal, so register does not gate on `create`
# or write an ownership grant -- the agent is already owned from build
# time (register-build runs first). Forwarding a None principal here
# previously 422'd and crash-looped the pod.
assert _payloads("/v1/authz/check") == []
assert _payloads("/v1/authz/grant") == []

post_with_error_handling_mock.reset_mock()
response = await isolated_client.delete("/agents/name/test-agent")
Expand Down
88 changes: 88 additions & 0 deletions agentex/tests/unit/api/test_agents_authz.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@

from __future__ import annotations

from datetime import UTC, datetime
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock

import pytest
from src.adapters.authorization.exceptions import AuthorizationError
from src.adapters.crud_store.exceptions import ItemDoesNotExist
from src.api.routes.agents import register_agent
from src.api.schemas.agents import RegisterAgentRequest
from src.api.schemas.authorization_types import (
AgentexResource,
AgentexResourceType,
AuthorizedOperationType,
)
from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus
from src.utils.authorization_shortcuts import (
DAuthorizedId,
DAuthorizedName,
Expand Down Expand Up @@ -261,3 +266,86 @@ async def test_none_authorized_ids_passes_through_unfiltered(self):
order_by=None,
order_direction="desc",
)


@pytest.mark.unit
@pytest.mark.asyncio
class TestRegisterAgentOwnershipEnforcement:
"""``/agents/register`` is whitelisted, so deployed pods self-register with
no principal. The create check / ownership grant must be skipped on that
path (the agent is already owned from build time), so a ``None`` principal
does not 422 and crash-loop the pod. An authenticated caller is enforced."""

@staticmethod
def _mocks(principal_context):
authorization = MagicMock()
authorization.principal_context = principal_context
authorization.check = AsyncMock(return_value=True)
authorization.grant = AsyncMock()

now = datetime.now(UTC)
agent = AgentEntity(
id="agent-1",
name="my-agent",
description="d",
acp_type=ACPType.ASYNC,
status=AgentStatus.READY,
acp_url="http://agent:5000",
created_at=now,
updated_at=now,
)
agents_use_case = MagicMock()
agents_use_case.register_agent = AsyncMock(return_value=agent)

api_keys_use_case = MagicMock()
existing_key = MagicMock()
existing_key.api_key = "internal-key"
api_keys_use_case.get_internal_api_key_by_agent_id = AsyncMock(
return_value=existing_key
)
return authorization, agents_use_case, api_keys_use_case

@staticmethod
def _request():
return RegisterAgentRequest(
name="my-agent",
description="d",
acp_url="http://agent:5000",
acp_type=ACPType.ASYNC,
)

@pytest.mark.parametrize("principal_context", [None, {}, {"account_id": "acct"}])
async def test_unresolvable_creator_skips_check_and_grant(self, principal_context):
# None (whitelisted self-reg), an empty dict, or a principal with no
# user_id/service_account_id all mean "no creator" -> skip, don't 422.
authz, use_case, api_keys = self._mocks(principal_context=principal_context)

resp = await register_agent(self._request(), use_case, authz, api_keys)

authz.check.assert_not_awaited()
authz.grant.assert_not_awaited()
use_case.register_agent.assert_awaited_once()
assert resp.agent_api_key == "internal-key"

async def test_dict_principal_enforces_check_and_grant(self):
authz, use_case, api_keys = self._mocks(
principal_context={"user_id": "u", "account_id": "acct"}
)

await register_agent(self._request(), use_case, authz, api_keys)

authz.check.assert_awaited_once()
authz.grant.assert_awaited_once()

async def test_object_principal_enforces_check_and_grant(self):
# Object-shaped principal exercises the getattr branch of the helper.
authz, use_case, api_keys = self._mocks(
principal_context=SimpleNamespace(
user_id="u", service_account_id=None, account_id="acct"
)
)

await register_agent(self._request(), use_case, authz, api_keys)

authz.check.assert_awaited_once()
authz.grant.assert_awaited_once()
Loading