diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index b1ededea..b122020e 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -5221,12 +5221,6 @@ components: acp_type: $ref: '#/components/schemas/ACPType' description: The type of ACP to use for the agent. - principal_context: - anyOf: - - {} - - type: 'null' - title: Principal Context - description: Principal used for authorization registration_metadata: anyOf: - additionalProperties: true @@ -5336,12 +5330,6 @@ components: type: string title: Description description: The description of the agent. - principal_context: - anyOf: - - {} - - type: 'null' - title: Principal Context - description: Principal used for authorization registration_metadata: anyOf: - additionalProperties: true diff --git a/agentex/src/api/routes/agents.py b/agentex/src/api/routes/agents.py index ac2cf13a..89578542 100644 --- a/agentex/src/api/routes/agents.py +++ b/agentex/src/api/routes/agents.py @@ -76,16 +76,9 @@ async def get_agent_by_name( AgentexResourceType.agent, AuthorizedOperationType.read ), agents_use_case: DAgentsUseCase, - authorization: DAuthorizationService, ): """Get an agent by its unique name.""" agent_entity = await agents_use_case.get(name=agent_name) - - await authorization.check( - resource=AgentexResource.agent(agent_entity.id), - operation=AuthorizedOperationType.read, - ) - return Agent.model_validate(agent_entity) @@ -183,7 +176,6 @@ async def register_agent( await authorization_service.check( AgentexResource.agent("*"), AuthorizedOperationType.create, - principal_context=request.principal_context, ) logger.info(f"Registering agent: {request}") try: @@ -198,7 +190,6 @@ async def register_agent( ) await authorization_service.grant( AgentexResource.agent(agent_entity.id), - principal_context=request.principal_context, ) response_fields = agent_entity.model_dump() existing_key = await api_keys_use_case.get_internal_api_key_by_agent_id( @@ -236,11 +227,10 @@ async def register_build( agents_use_case: DAgentsUseCase, authorization_service: DAuthorizationService, ) -> Agent: - """Create a build-only agent row and register its authz resource.""" + """Create a build-only agent row after create authorization.""" await authorization_service.check( AgentexResource.agent("*"), AuthorizedOperationType.create, - principal_context=request.principal_context, ) logger.info(f"Registering build for agent: {request.name}") try: @@ -252,9 +242,8 @@ async def register_build( ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e - await authorization_service.register_resource( + await authorization_service.grant( AgentexResource.agent(agent_entity.id), - principal_context=request.principal_context, ) return Agent.model_validate(agent_entity) diff --git a/agentex/src/api/schemas/agents.py b/agentex/src/api/schemas/agents.py index 76c5a743..40fd4273 100644 --- a/agentex/src/api/schemas/agents.py +++ b/agentex/src/api/schemas/agents.py @@ -88,9 +88,6 @@ class RegisterAgentRequest(BaseModel): description="Optional agent ID if the agent already exists and needs to be updated.", ) acp_type: ACPType = Field(..., description="The type of ACP to use for the agent.") - principal_context: Any | None = Field( - default=None, description="Principal used for authorization" - ) registration_metadata: dict[str, Any] | None = Field( default=None, description="The metadata for the agent's registration.", @@ -120,9 +117,6 @@ class RegisterBuildRequest(BaseModel): ..., pattern=r"^[a-z0-9-]+$", description="The unique name of the agent." ) description: str = Field(..., description="The description of the agent.") - principal_context: Any | None = Field( - default=None, description="Principal used for authorization" - ) registration_metadata: dict[str, Any] | None = Field( default=None, description="The metadata for the agent's build registration.", diff --git a/agentex/src/domain/services/schedule_service.py b/agentex/src/domain/services/schedule_service.py index 0f815b29..c6b3102d 100644 --- a/agentex/src/domain/services/schedule_service.py +++ b/agentex/src/domain/services/schedule_service.py @@ -84,13 +84,12 @@ async def create_schedule( else None ) - # Schedules have no Postgres row — Temporal is the store and the auth + # Schedules have no Postgres row: Temporal is the store and the auth # selector is the schedule id ({agent_id}--{schedule_name}). Register - # the auth tuple (with the parent_agent edge) BEFORE the Temporal write - # (fail-closed), and compensate if the Temporal create fails so we never - # leave an orphan tuple. The read-back below is intentionally OUTSIDE - # the compensation scope: a describe failure must not deregister a - # schedule that was actually created. + # before the Temporal write so an auth failure fails closed. If the + # Temporal create fails after registration, compensate with a deregister. + # The read-back below is intentionally outside the compensation scope + # because the schedule was already created. registered = await self._register_schedule_in_auth( schedule_id=schedule_id, agent_id=agent.id ) @@ -109,7 +108,7 @@ async def create_schedule( paused=request.paused, ) except Exception: - # Orphan-tuple guard: the tuple was written but the schedule never + # Orphan guard: the auth entry was written but the schedule never # landed in Temporal. Best-effort compensating deregister, then # re-raise the original error. if registered: @@ -122,17 +121,15 @@ async def create_schedule( async def _register_schedule_in_auth( self, *, schedule_id: str, agent_id: str ) -> bool: - """Register a new agent_schedule with the auth service, including the - parent_agent edge so permissions cascade from the owning agent. + """Register the schedule in the authorization graph before creating it. - Called BEFORE the Temporal write — a failure raises and prevents the - schedule from being created. Skipped with a warning when no usable - creator identity is available on the principal context (e.g. - agent-bypass / internal paths without an authenticated user); this is - the interim behavior until on-behalf-of-user identity is threaded. + The schedule is registered under its parent agent so permissions + cascade from the owning agent. Registering before the Temporal create + fails closed: an auth failure aborts the create, and the caller + compensates with a deregister if the Temporal create later fails. - Returns True when a tuple was actually registered (so the caller knows - whether a compensating deregister is warranted), False when skipped. + Returns True when the schedule was registered, or False when no creator + identity is resolvable and registration is skipped. """ principal_context = self.authorization_service.principal_context user_id = getattr(principal_context, "user_id", None) @@ -149,9 +146,8 @@ async def _register_schedule_in_auth( parent=AgentexResource.agent(agent_id), ) except Exception as exc: - # Fail closed: log + re-raise so the schedule is never created. logger.exception( - "Auth register_resource failed for agent_schedule; aborting create", + "Auth registration failed for agent_schedule; aborting create", extra={ "schedule_id": schedule_id, "agent_id": agent_id, @@ -162,12 +158,11 @@ async def _register_schedule_in_auth( return True async def _deregister_schedule_from_auth(self, *, schedule_id: str) -> None: - """Best-effort deregistration of a schedule's auth tuples. + """Best-effort removal of the schedule from the authorization graph. - ``deregister_resource`` removes the resource and all of its - relationships (owner, parent, grantees) atomically. Used both on delete - and as the compensating action when a Temporal create fails. Failures - are logged but never propagate. + Temporal is the source of truth for schedule existence. Once Temporal + delete succeeds, a deregister failure is logged but does not block the + delete response. """ try: await self.authorization_service.deregister_resource( @@ -175,7 +170,7 @@ async def _deregister_schedule_from_auth(self, *, schedule_id: str) -> None: ) except Exception as exc: logger.warning( - "Auth deregister failed for agent_schedule; tuple may be orphaned", + "Auth deregister failed for agent_schedule; entry may be orphaned", extra={ "schedule_id": schedule_id, "error_type": type(exc).__name__, @@ -392,7 +387,10 @@ def _description_to_response( # Decode bytes to string if possible try: import json - workflow_params.append(json.loads(arg.data.decode("utf-8"))) + + workflow_params.append( + json.loads(arg.data.decode("utf-8")) + ) except (json.JSONDecodeError, UnicodeDecodeError): workflow_params.append(str(arg.data)) else: @@ -430,7 +428,9 @@ def _description_to_response( if hasattr(info, "recent_actions") and info.recent_actions: # ScheduleActionResult has started_at (when action started) and scheduled_at (when it was scheduled) last_action = info.recent_actions[-1] - last_action_time = getattr(last_action, "started_at", None) or getattr(last_action, "scheduled_at", None) + last_action_time = getattr(last_action, "started_at", None) or getattr( + last_action, "scheduled_at", None + ) created_at: datetime | None = ( cast(datetime, info.create_time) if hasattr(info, "create_time") and info.create_time diff --git a/agentex/src/domain/services/task_service.py b/agentex/src/domain/services/task_service.py index b9f9f0ef..809dd954 100644 --- a/agentex/src/domain/services/task_service.py +++ b/agentex/src/domain/services/task_service.py @@ -64,11 +64,10 @@ async def create_task( Returns: Task containing the created task info """ - # Register in the authorization service before persisting: a registration - # failure aborts the request with no orphaned row. If the persist fails - # after a successful registration, the compensating deregister_resource - # below prevents a dangling authorization entry. Both calls are no-ops - # when the authorization service is disabled for this account. + # Register the task in the authorization graph before persisting: a + # registration failure aborts the request with no orphaned row. If the + # persist fails after a successful registration, the compensating + # deregister_resource below prevents a dangling authorization entry. task_entity = TaskEntity( id=orm_id(), name=task_name, diff --git a/agentex/src/domain/use_cases/agent_api_keys_use_case.py b/agentex/src/domain/use_cases/agent_api_keys_use_case.py index 0f50ab4d..d89d21be 100644 --- a/agentex/src/domain/use_cases/agent_api_keys_use_case.py +++ b/agentex/src/domain/use_cases/agent_api_keys_use_case.py @@ -112,14 +112,12 @@ async def _register_api_key_in_auth( api_key_id: str, agent_id: str, ) -> None: - """Register a new agent_api_key with the auth service, including the - parent_agent edge so permissions cascade from the owning agent. - - Called BEFORE the Postgres write — a failure raises and prevents the - row from being persisted, so there is no compensating action. Skipped - with a warning when no usable creator identity is available on the - principal context (e.g. internal-key creation paths without an - authenticated user). + """Register a new agent_api_key in the authorization graph before persist. + + The api key is registered under its parent agent so permissions cascade + from the owning agent. Registering before the Postgres row is created + fails closed: a failure aborts the create rather than leaving a + persisted api key that cannot be authorized. """ principal_context = self.authorization_service.principal_context user_id = getattr(principal_context, "user_id", None) @@ -139,9 +137,8 @@ async def _register_api_key_in_auth( parent=AgentexResource.agent(agent_id), ) except Exception as exc: - # Fail closed: log + re-raise so the Postgres row is never written. logger.exception( - "Auth register_resource failed for agent_api_key; aborting create", + "Auth registration failed for agent_api_key; aborting create", extra={ "api_key_id": api_key_id, "agent_id": agent_id, @@ -151,21 +148,19 @@ async def _register_api_key_in_auth( raise async def _deregister_api_key_from_auth(self, *, api_key_id: str) -> None: - """Best-effort deregistration of an api_key's auth tuples on delete. + """Best-effort removal of the api_key from the authorization graph. - ``deregister_resource`` removes the resource and all of its - relationships (owner, parent, grantees) atomically. Always invoked; - the authorization service decides how to route the call. Failures are - logged but do not block the delete. + Deletes treat Postgres as the source of truth for existence. Once the + row is gone, a deregister failure is logged but does not block the + delete response. """ try: await self.authorization_service.deregister_resource( resource=AgentexResource.api_key(api_key_id), ) except Exception as exc: - # Best-effort: log and continue. Postgres row already deleted. logger.warning( - "Auth deregister failed for agent_api_key; tuple may be orphaned", + "Auth deregister failed for agent_api_key; entry may be orphaned", extra={ "api_key_id": api_key_id, "error_type": type(exc).__name__, @@ -203,8 +198,8 @@ async def get_external_by_agent_id_and_key( ) async def delete(self, id: str) -> None: - # Pre-fetch so we skip both the delete and the deregister when the row - # never existed — no DB round-trip, no auth round-trip for a no-op. + # Pre-fetch so we skip both the delete and auth cleanup when the row + # never existed: no DB round-trip, no auth round-trip for a no-op. try: await self.agent_api_key_repo.get(id=id) except ItemDoesNotExist: diff --git a/agentex/src/domain/use_cases/agents_acp_use_case.py b/agentex/src/domain/use_cases/agents_acp_use_case.py index 98fe0bab..bc99111a 100644 --- a/agentex/src/domain/use_cases/agents_acp_use_case.py +++ b/agentex/src/domain/use_cases/agents_acp_use_case.py @@ -10,9 +10,7 @@ AuthenticationServiceUnavailableError, ) from src.adapters.crud_store.exceptions import ItemDoesNotExist -from src.api.schemas.authorization_types import ( - AgentexResource, -) +from src.api.schemas.authorization_types import AgentexResource from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus from src.domain.entities.agents_rpc import ( ACP_TYPE_TO_ALLOWED_RPC_METHODS, @@ -237,7 +235,7 @@ async def _execute_with_error_handling( raise e async def grant_with_retry(self, task: TaskEntity, attempts: int = 0) -> None: - """Grant authorization for a task with retry""" + """Grant ownership for a newly created task.""" try: await self.authorization_service.grant( resource=AgentexResource.task(task.id), @@ -250,11 +248,10 @@ async def grant_with_retry(self, task: TaskEntity, attempts: int = 0) -> None: ) await asyncio.sleep(delay) return await self.grant_with_retry(task, attempts + 1) - else: - logger.error( - f"Authentication service unavailable: {e}. Max retries reached." - ) - raise e from e + logger.error( + f"Authentication service unavailable: {e}. Max retries reached." + ) + raise e from e except Exception as e: logger.error(f"Error granting authorization for task {task.id}: {e}") await self.task_service.fail_task(task, str(e)) diff --git a/agentex/src/domain/use_cases/agents_use_case.py b/agentex/src/domain/use_cases/agents_use_case.py index f23f9c55..5853ae7d 100644 --- a/agentex/src/domain/use_cases/agents_use_case.py +++ b/agentex/src/domain/use_cases/agents_use_case.py @@ -8,6 +8,7 @@ from src.adapters.temporal.exceptions import ( TemporalWorkflowAlreadyExistsError, ) +from src.api.schemas.authorization_types import AgentexResource from src.config.environment_variables import EnvironmentVariables from src.domain.entities.agents import ACPType, AgentEntity, AgentInputType, AgentStatus from src.domain.entities.deployments import DeploymentEntity, DeploymentStatus @@ -16,6 +17,7 @@ DDeploymentHistoryRepository, ) from src.domain.repositories.deployment_repository import DDeploymentRepository +from src.domain.services.authorization_service import DAuthorizationService from src.temporal.workflows.healthcheck_workflow import HealthCheckWorkflow from src.utils.ids import orm_id from src.utils.logging import make_logger @@ -30,11 +32,54 @@ def __init__( deployment_history_repository: DDeploymentHistoryRepository, deployment_repository: DDeploymentRepository, temporal_adapter: DTemporalAdapter, + authorization_service: DAuthorizationService, ): self.agent_repo = agent_repository self.deployment_history_repo = deployment_history_repository self.deployment_repo = deployment_repository self.temporal_adapter = temporal_adapter + self.authorization_service = authorization_service + + async def _safe_deregister(self, agent_id: str) -> None: + """Best-effort removal of an agent from the authorization graph. + + Swallows and logs any failure so a compensating/post-delete deregister + never masks the in-flight error (create) or fails a delete that already + succeeded. + """ + try: + await self.authorization_service.deregister_resource( + AgentexResource.agent(agent_id) + ) + except Exception: + logger.exception( + "authorization deregister failed for agent %s; swallowed", agent_id + ) + + async def _register_in_auth(self, agent_id: str) -> bool: + """Register a newly created agent in the authorization graph. + + Called before the row is persisted so a failure aborts the create with no + orphaned row; the _safe_deregister compensation undoes it. Skipped with a + warning when no creator identity is resolvable on the principal context: + an agent has no parent edge, so the principal is the sole anchor for + ownership and there is nothing to attribute it to. Returns whether a + register call was actually made so compensation can avoid deregistering + a resource it never registered. + """ + principal_context = self.authorization_service.principal_context + user_id = getattr(principal_context, "user_id", None) + service_account_id = getattr(principal_context, "service_account_id", None) + if user_id is None and service_account_id is None: + logger.warning( + "Skipping authorization registration for agent: no creator resolvable", + extra={"agent_id": agent_id}, + ) + return False + await self.authorization_service.register_resource( + AgentexResource.agent(agent_id) + ) + return True async def register_agent( self, @@ -148,6 +193,11 @@ async def register_agent( registered_at=datetime.now(UTC), agent_input_type=agent_input_type, ) + # Record ownership in the authorization graph before persisting; a + # failure here aborts the create with no orphaned row. Only the + # genuine-create path registers — the update paths above must not, + # or re-registering would rewrite the owner to the current caller. + registered_in_auth = await self._register_in_auth(agent.id) # This is a problem only if multiple pods spin up and then make a request all at the same time. # In that case, the first pod will create the agent and the rest should succeed silently try: @@ -156,9 +206,16 @@ async def register_agent( logger.info( f"Agent {name} was likely created in parallel, skipping creation" ) - # Re-fetch the actual persisted agent so downstream code - # (complete_deployment_registration) uses the correct agent_id + # Parallel writer already created it; undo our ownership + # registration and re-fetch the persisted agent so downstream + # code (complete_deployment_registration) uses the correct agent_id + if registered_in_auth: + await self._safe_deregister(agent.id) agent = await self.agent_repo.get(name=name) + except Exception: + if registered_in_auth: + await self._safe_deregister(agent.id) + raise await self.complete_deployment_registration( agent, acp_url, registration_metadata ) @@ -205,6 +262,10 @@ async def register_build( registration_metadata=registration_metadata, agent_input_type=agent_input_type, ) + # Record ownership before persisting, same as register_agent's genuine + # create branch. The early return above for an existing agent means we + # only ever register on a true first-time build create. + registered_in_auth = await self._register_in_auth(agent.id) # If multiple builds for the same new agent race, the first wins and the # rest re-fetch the persisted row instead of erroring. try: @@ -213,7 +274,14 @@ async def register_build( logger.info( f"Agent {name} was likely created in parallel, returning existing" ) + # undo our ownership registration from _register_in_auth + if registered_in_auth: + await self._safe_deregister(agent.id) agent = await self.agent_repo.get(name=name) + except Exception: + if registered_in_auth: + await self._safe_deregister(agent.id) + raise return agent async def complete_deployment_registration( @@ -308,6 +376,10 @@ async def delete( agent.status = AgentStatus.DELETED agent.status_reason = "Agent deleted successfully" await self.agent_repo.update(agent) + # Best-effort: remove the agent from the authorization graph after the + # soft-delete. The route revokes ownership separately after delete + # succeeds. + await self._safe_deregister(agent.id) return agent async def list( diff --git a/agentex/tests/integration/api/agents/test_agents_auth_api.py b/agentex/tests/integration/api/agents/test_agents_auth_api.py index ea9a02de..fd8ac821 100644 --- a/agentex/tests/integration/api/agents/test_agents_auth_api.py +++ b/agentex/tests/integration/api/agents/test_agents_auth_api.py @@ -36,6 +36,10 @@ async def _mock_post_with_error_handling( return {"success": True} elif path == "/v1/authz/revoke": return {"success": True} + elif path == "/v1/authz/register": + return {"success": True} + elif path == "/v1/authz/deregister": + return {"success": True} raise Exception(f"Unknown path: {path}") @@ -184,14 +188,20 @@ async def test_agent_check( "src.utils.http_request_handler.HttpRequestHandler.post_with_error_handling", side_effect=_mock_post_with_error_handling, ) - async def test_agent_grant_revoke( + async def test_agent_create_and_delete_gate_and_write_legacy_authz( self, post_with_error_handling_mock, is_enabled_mock, is_enabled_authorization_mock, isolated_client, ): - # Registering a new agent will grant the agent permission + def _payloads(path: str): + return [ + call[1]["json"] + for call in post_with_error_handling_mock.call_args_list + if call[0][1] == path + ] + response = await isolated_client.post( "/agents/register", json={ @@ -204,56 +214,37 @@ async def test_agent_grant_revoke( assert response.status_code == 200 agent = response.json() - assert post_with_error_handling_mock.call_count == 2 - assert ( - post_with_error_handling_mock.call_args_list[0][0][1] == "/v1/authz/check" - ) - authz_data = post_with_error_handling_mock.call_args_list[0][1]["json"] - assert authz_data["resource"]["type"] == AgentexResourceType.agent.value - assert authz_data["resource"]["selector"] == "*" - assert authz_data["operation"] == "create" - assert ( - post_with_error_handling_mock.call_args_list[1][0][1] == "/v1/authz/grant" - ) - assert post_with_error_handling_mock.call_args_list[1][1]["json"][ - "resource" - ] == { - "type": AgentexResourceType.agent.value, - "selector": agent["id"], - } - assert ( - post_with_error_handling_mock.call_args_list[1][1]["json"]["operation"] - == "create" - ) + create_checks = _payloads("/v1/authz/check") + assert len(create_checks) == 1 + assert create_checks[0]["resource"]["type"] == AgentexResourceType.agent.value + assert create_checks[0]["resource"]["selector"] == "*" + assert create_checks[0]["operation"] == "create" + + agent_grants = [ + payload + for payload in _payloads("/v1/authz/grant") + if payload["resource"]["type"] == AgentexResourceType.agent.value + ] + assert len(agent_grants) == 1 + assert agent_grants[0]["resource"]["selector"] == agent["id"] + assert agent_grants[0]["operation"] == "create" - # Deleting the agent will revoke the agent permission post_with_error_handling_mock.reset_mock() response = await isolated_client.delete("/agents/name/test-agent") assert response.status_code == 200 - assert post_with_error_handling_mock.call_count == 3 - assert post_with_error_handling_mock.call_args_list[0][0][1] == "/v1/authn" - assert ( - post_with_error_handling_mock.call_args_list[1][0][1] == "/v1/authz/check" - ) - authz_data = post_with_error_handling_mock.call_args_list[1][1]["json"] - assert authz_data["resource"]["type"] == AgentexResourceType.agent.value - assert authz_data["resource"]["selector"] == agent["id"] - assert authz_data["operation"] == "delete" - assert authz_data["principal"] == MOCK_PRINCIPAL_CONTEXT - assert ( - post_with_error_handling_mock.call_args_list[2][0][1] == "/v1/authz/revoke" - ) - assert ( - post_with_error_handling_mock.call_args_list[2][1]["json"]["principal"] - == MOCK_PRINCIPAL_CONTEXT - ) - assert post_with_error_handling_mock.call_args_list[2][1]["json"][ - "resource" - ] == { - "type": AgentexResourceType.agent.value, - "selector": agent["id"], - } - assert ( - post_with_error_handling_mock.call_args_list[2][1]["json"]["operation"] - == "delete" - ) + + delete_checks = _payloads("/v1/authz/check") + assert len(delete_checks) == 1 + assert delete_checks[0]["resource"]["type"] == AgentexResourceType.agent.value + assert delete_checks[0]["resource"]["selector"] == agent["id"] + assert delete_checks[0]["operation"] == "delete" + assert delete_checks[0]["principal"] == MOCK_PRINCIPAL_CONTEXT + + agent_revokes = [ + payload + for payload in _payloads("/v1/authz/revoke") + if payload["resource"]["type"] == AgentexResourceType.agent.value + ] + assert len(agent_revokes) == 1 + assert agent_revokes[0]["resource"]["selector"] == agent["id"] + assert agent_revokes[0]["operation"] == "delete" diff --git a/agentex/tests/integration/fixtures/integration_client.py b/agentex/tests/integration/fixtures/integration_client.py index b7c2c76a..1e173506 100644 --- a/agentex/tests/integration/fixtures/integration_client.py +++ b/agentex/tests/integration/fixtures/integration_client.py @@ -397,6 +397,7 @@ def create_agents_use_case(): "deployment_history_repository" ], deployment_repository=isolated_repositories["deployment_repository"], + authorization_service=make_noop_authorization_service(), ) def create_agent_api_keys_use_case(): diff --git a/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py index 49e79aaa..77ed339f 100644 --- a/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py +++ b/agentex/tests/integration/services/test_agent_api_key_service_dual_write.py @@ -1,27 +1,18 @@ -"""Integration tests for AgentAPIKeysUseCase dual-write to the authorization service. - -scale-agentex calls ``register_resource`` / ``deregister_resource`` -unconditionally; per-account routing is owned by the authorization gateway, -so scale-agentex does NOT couple to the authorization flag service. - -- Create calls register_resource with parent=agent (the parent_agent edge - is load-bearing for the authorization cascade). -- Delete calls deregister_resource after the Postgres row is gone. -- Registration failure prevents row: when register_resource raises, the - api_key is NOT persisted. -- Deregister failure does not block delete: when deregister_resource - raises, the DB delete still completes and the failure is logged. -- No creator → no register: if neither user_id nor service_account_id is - resolvable, the dual-write is a no-op (logged) and the row still lands. +"""Integration tests for AgentAPIKeysUseCase authorization writes. -The tests intentionally mock the repository, authorization service, agent -repository, and HTTP client. The behaviour under test is the call sequencing -inside ``AgentAPIKeysUseCase`` — not the underlying persistence or authorization -cluster itself. +Agent API keys have no service layer, so the authorization-write sequencing is +colocated in ``AgentAPIKeysUseCase`` with the Postgres write: + +- Create registers the api_key in the authorization graph under parent=agent, + before the api_key row is persisted. +- Registration failure prevents row creation. +- Delete removes the Postgres row first, then deregisters best-effort. +- No creator identity means the registration is skipped and the row still lands. -Note on structural divergence from the task dual-write: tasks live behind -``AgentTaskService``; agent_api_keys have no service layer, so the dual-write -logic is colocated in ``AgentAPIKeysUseCase``. +The tests intentionally mock the repository, authorization service, agent +repository, and HTTP client. The behavior under test is the call sequencing +inside ``AgentAPIKeysUseCase``, not the underlying persistence or authorization +service itself. """ from __future__ import annotations @@ -50,7 +41,7 @@ def _agent() -> AgentEntity: return AgentEntity( id=agent_id, name=f"agent-{agent_id[:8]}", - description="dual-write test agent", + description="authorization-write test agent", status=AgentStatus.READY, acp_type=ACPType.SYNC, acp_url="http://test-acp", @@ -76,7 +67,7 @@ def _build_use_case( agent: AgentEntity | None = None, create_raises: Exception | None = None, monkeypatch: pytest.MonkeyPatch, -) -> tuple[AgentAPIKeysUseCase, Mock, AsyncMock, AsyncMock]: +) -> tuple[AgentAPIKeysUseCase, Mock, Mock]: sample_agent = agent or _agent() agent_repository = Mock() @@ -123,12 +114,7 @@ def _build_use_case( client=Mock(), authorization_service=authorization_service, ) - return ( - use_case, - agent_api_key_repository, - authorization_service.register_resource, - authorization_service.deregister_resource, - ) + return use_case, agent_api_key_repository, authorization_service @pytest.mark.asyncio @@ -137,7 +123,7 @@ async def test_create_api_key_calls_register_resource_with_parent( monkeypatch: pytest.MonkeyPatch, ) -> None: agent = _agent() - use_case, repo, register, _ = _build_use_case( + use_case, repo, authorization_service = _build_use_case( principal=_principal(user_id="user-A", account_id="acct-1"), agent=agent, monkeypatch=monkeypatch, @@ -150,19 +136,21 @@ async def test_create_api_key_calls_register_resource_with_parent( api_key="secret", ) - register.assert_awaited_once() - registered_resource: AgentexResource = register.await_args.kwargs["resource"] + authorization_service.register_resource.assert_awaited_once() + registered_resource: AgentexResource = ( + authorization_service.register_resource.await_args.kwargs["resource"] + ) assert registered_resource.type == AgentexResourceType.api_key assert registered_resource.selector == api_key.id - # parent_agent edge is load-bearing — without it the authorization cascade - # `read = ... & parent_agent->read & ...` fails closed for every reader. - registered_parent: AgentexResource = register.await_args.kwargs["parent"] + registered_parent: AgentexResource = ( + authorization_service.register_resource.await_args.kwargs["parent"] + ) + # parent_agent is load-bearing: without it the authorization cascade from + # the owning agent fails closed for readers. assert registered_parent is not None assert registered_parent.type == AgentexResourceType.agent assert registered_parent.selector == agent.id repo.create.assert_awaited_once() - # Sanity: the persisted entity itself; we don't persist creator audit - # columns in OSS scale-agentex (Harvey's review feedback on #248). assert api_key.id is not None @@ -171,7 +159,7 @@ async def test_create_api_key_calls_register_resource_with_parent( async def test_delete_api_key_calls_deregister_resource( monkeypatch: pytest.MonkeyPatch, ) -> None: - use_case, repo, _, deregister = _build_use_case( + use_case, repo, authorization_service = _build_use_case( principal=_principal(user_id="user-A", account_id="acct-1"), monkeypatch=monkeypatch, ) @@ -180,27 +168,29 @@ async def test_delete_api_key_calls_deregister_resource( await use_case.delete(id=api_key_id) repo.delete.assert_awaited_once_with(id=api_key_id) - deregister.assert_awaited_once() - deregistered_resource: AgentexResource = deregister.await_args.kwargs["resource"] + authorization_service.deregister_resource.assert_awaited_once() + deregistered_resource: AgentexResource = ( + authorization_service.deregister_resource.await_args.kwargs["resource"] + ) assert deregistered_resource.type == AgentexResourceType.api_key assert deregistered_resource.selector == api_key_id @pytest.mark.asyncio @pytest.mark.integration -async def test_create_api_key_grant_failure_prevents_db_row( +async def test_create_api_key_register_failure_prevents_db_row( monkeypatch: pytest.MonkeyPatch, ) -> None: - register_resource = AsyncMock(side_effect=RuntimeError("spark unavailable")) + register_resource = AsyncMock(side_effect=RuntimeError("authz unavailable")) agent = _agent() - use_case, repo, _, _ = _build_use_case( + use_case, repo, authorization_service = _build_use_case( principal=_principal(user_id="user-A", account_id="acct-1"), register_resource=register_resource, agent=agent, monkeypatch=monkeypatch, ) - with pytest.raises(RuntimeError, match="spark unavailable"): + with pytest.raises(RuntimeError, match="authz unavailable"): await use_case.create( name="k1", agent_id=agent.id, @@ -209,36 +199,36 @@ async def test_create_api_key_grant_failure_prevents_db_row( ) repo.create.assert_not_awaited() + authorization_service.deregister_resource.assert_not_awaited() @pytest.mark.asyncio @pytest.mark.integration -async def test_delete_api_key_revoke_failure_does_not_block_delete( +async def test_delete_api_key_deregister_failure_does_not_block_delete( monkeypatch: pytest.MonkeyPatch, ) -> None: - deregister = AsyncMock(side_effect=RuntimeError("spark unavailable")) - use_case, repo, _, deregister_ref = _build_use_case( + deregister = AsyncMock(side_effect=RuntimeError("authz unavailable")) + use_case, repo, authorization_service = _build_use_case( principal=_principal(user_id="user-A", account_id="acct-1"), deregister_resource=deregister, monkeypatch=monkeypatch, ) - # Should NOT raise. await use_case.delete(id=orm_id()) repo.delete.assert_awaited_once() - deregister_ref.assert_awaited_once() + authorization_service.deregister_resource.assert_awaited_once() @pytest.mark.asyncio @pytest.mark.integration -async def test_create_api_key_skips_grant_when_no_creator_resolvable( +async def test_create_api_key_skips_auth_writes_when_no_creator_resolvable( monkeypatch: pytest.MonkeyPatch, ) -> None: """If neither user_id nor service_account_id is available on the principal, - the dual-write is a no-op (logged) and the row still lands without a tuple.""" + the registration is skipped and the row still lands.""" agent = _agent() - use_case, repo, register, _ = _build_use_case( + use_case, repo, authorization_service = _build_use_case( principal=_principal(user_id=None, account_id="acct-1"), agent=agent, monkeypatch=monkeypatch, @@ -251,20 +241,19 @@ async def test_create_api_key_skips_grant_when_no_creator_resolvable( api_key="secret", ) - register.assert_not_called() + authorization_service.register_resource.assert_not_called() repo.create.assert_awaited_once() - # Sanity: the row landed even though we skipped the auth-side registration. assert api_key.id is not None @pytest.mark.asyncio @pytest.mark.integration -async def test_delete_by_agent_id_and_key_name_revokes_existing( +async def test_delete_by_agent_id_and_key_name_removes_auth_entries( monkeypatch: pytest.MonkeyPatch, ) -> None: agent = _agent() existing_id = orm_id() - use_case, repo, _, deregister = _build_use_case( + use_case, repo, authorization_service = _build_use_case( principal=_principal(user_id="user-A", account_id="acct-1"), agent=agent, monkeypatch=monkeypatch, @@ -286,8 +275,10 @@ async def test_delete_by_agent_id_and_key_name_revokes_existing( ) repo.delete_by_agent_id_and_key_name.assert_awaited_once() - deregister.assert_awaited_once() - deregistered_resource: AgentexResource = deregister.await_args.kwargs["resource"] + authorization_service.deregister_resource.assert_awaited_once() + deregistered_resource: AgentexResource = ( + authorization_service.deregister_resource.await_args.kwargs["resource"] + ) assert deregistered_resource.selector == existing_id @@ -297,15 +288,14 @@ async def test_delete_api_key_skips_when_row_does_not_exist( monkeypatch: pytest.MonkeyPatch, ) -> None: """When the api_key id doesn't exist, the pre-fetch raises and we early- - return — no DB delete, no auth deregister. Avoids round-trips on a no-op.""" - use_case, repo, _, deregister = _build_use_case( + return with no DB delete or auth cleanup.""" + use_case, repo, authorization_service = _build_use_case( principal=_principal(user_id="user-A", account_id="acct-1"), monkeypatch=monkeypatch, ) - # Override the default "row exists" sentinel. repo.get = AsyncMock(side_effect=ItemDoesNotExist("not found")) await use_case.delete(id=orm_id()) repo.delete.assert_not_called() - deregister.assert_not_called() + authorization_service.deregister_resource.assert_not_called() diff --git a/agentex/tests/integration/services/test_schedule_service_dual_write.py b/agentex/tests/integration/services/test_schedule_service_dual_write.py index bb10a0a0..b030f439 100644 --- a/agentex/tests/integration/services/test_schedule_service_dual_write.py +++ b/agentex/tests/integration/services/test_schedule_service_dual_write.py @@ -1,32 +1,23 @@ -"""Integration tests for ScheduleService dual-write to the authorization service. - -scale-agentex calls ``register_resource`` / ``deregister_resource`` -unconditionally; per-account routing is owned by the authorization gateway, -so scale-agentex does NOT couple to a feature-flag service. - -Schedule-specific shape (vs the agent_api_key dual-write): schedules have no -Postgres row — Temporal is the store and the auth selector is the schedule id -``{agent_id}--{schedule_name}``. The dual-write is therefore Temporal + authorization, -so the dual-write lives in ``ScheduleService`` (where the Temporal write is) -rather than the use case, and the compensation boundary is scoped to the -Temporal create only: - -- Create registers register_resource with parent=agent (the parent_agent edge - is load-bearing for the authorization cascade) BEFORE the Temporal create. -- Register failure prevents the Temporal create (fail-closed). -- A Temporal create failure after a successful register triggers a - compensating deregister (best-effort), then the original error re-raises. -- A post-create read-back (describe) failure does NOT compensate — the - schedule was actually created, so its tuple must survive. -- Delete calls deregister_resource after the Temporal delete; a deregister - failure does not block the delete. -- No creator → no register: if neither user_id nor service_account_id is - resolvable, the dual-write is a no-op (logged) and the schedule is still - created. +"""Integration tests for ScheduleService authorization writes. + +Schedules have no Postgres row: Temporal is the store and the auth selector is +``{agent_id}--{schedule_name}``. The authorization-write sequencing therefore +lives in ``ScheduleService`` next to the Temporal write: + +- Create registers the schedule in the authorization graph under parent=agent, + before the Temporal create. +- Registration failure prevents the Temporal create. +- A Temporal create failure after a successful registration triggers a + best-effort compensating deregister and re-raises the original Temporal error. +- A post-create read-back failure does not deregister, because the schedule was + actually created. +- Delete removes the Temporal schedule first, then deregisters best-effort. +- No creator identity means the registration is skipped and the schedule still + lands in Temporal. The tests mock the Temporal adapter and authorization service and stub the -post-create read-back; the behaviour under test is the call sequencing inside -``ScheduleService`` — not Temporal or the authorization cluster itself. +post-create read-back; the behavior under test is the call sequencing inside +``ScheduleService``, not Temporal or the authorization service itself. """ from __future__ import annotations @@ -56,7 +47,7 @@ def _agent() -> AgentEntity: return AgentEntity( id=agent_id, name=f"agent-{agent_id[:8]}", - description="dual-write test agent", + description="authorization-write test agent", status=AgentStatus.READY, acp_type=ACPType.SYNC, acp_url="http://test-acp", @@ -80,7 +71,7 @@ def _build_service( create_raises: Exception | None = None, delete_raises: Exception | None = None, get_schedule_raises: Exception | None = None, -) -> tuple[ScheduleService, Mock, AsyncMock, AsyncMock]: +) -> tuple[ScheduleService, Mock, Mock]: temporal_adapter = Mock() temporal_adapter.create_schedule = AsyncMock( side_effect=create_raises, return_value=None @@ -109,12 +100,7 @@ def _build_service( else: service.get_schedule = AsyncMock(return_value=Mock(spec=ScheduleResponse)) - return ( - service, - temporal_adapter, - authorization_service.register_resource, - authorization_service.deregister_resource, - ) + return service, temporal_adapter, authorization_service @pytest.mark.asyncio @@ -122,19 +108,23 @@ def _build_service( async def test_create_schedule_calls_register_resource_with_parent() -> None: agent = _agent() request = _request("nightly") - service, temporal_adapter, register, _ = _build_service( + service, temporal_adapter, authorization_service = _build_service( principal=_principal(user_id="user-A"), ) await service.create_schedule(agent, request) - register.assert_awaited_once() - registered_resource: AgentexResource = register.await_args.kwargs["resource"] + authorization_service.register_resource.assert_awaited_once() + registered_resource: AgentexResource = ( + authorization_service.register_resource.await_args.kwargs["resource"] + ) assert registered_resource.type == AgentexResourceType.schedule assert registered_resource.selector == build_schedule_id(agent.id, request.name) - # parent_agent edge is load-bearing — without it the authorization cascade - # `read = ... & parent_agent->read` fails closed for every reader. - registered_parent: AgentexResource = register.await_args.kwargs["parent"] + registered_parent: AgentexResource = ( + authorization_service.register_resource.await_args.kwargs["parent"] + ) + # parent_agent is load-bearing: without it the authorization cascade from + # the owning agent fails closed for readers. assert registered_parent is not None assert registered_parent.type == AgentexResourceType.agent assert registered_parent.selector == agent.id @@ -145,7 +135,7 @@ async def test_create_schedule_calls_register_resource_with_parent() -> None: @pytest.mark.integration async def test_delete_schedule_calls_deregister_resource() -> None: agent = _agent() - service, temporal_adapter, _, deregister = _build_service( + service, temporal_adapter, authorization_service = _build_service( principal=_principal(user_id="user-A"), ) @@ -153,8 +143,10 @@ async def test_delete_schedule_calls_deregister_resource() -> None: schedule_id = build_schedule_id(agent.id, "nightly") temporal_adapter.delete_schedule.assert_awaited_once_with(schedule_id) - deregister.assert_awaited_once() - deregistered_resource: AgentexResource = deregister.await_args.kwargs["resource"] + authorization_service.deregister_resource.assert_awaited_once() + deregistered_resource: AgentexResource = ( + authorization_service.deregister_resource.await_args.kwargs["resource"] + ) assert deregistered_resource.type == AgentexResourceType.schedule assert deregistered_resource.selector == schedule_id @@ -162,18 +154,18 @@ async def test_delete_schedule_calls_deregister_resource() -> None: @pytest.mark.asyncio @pytest.mark.integration async def test_create_schedule_register_failure_prevents_temporal_create() -> None: - register = AsyncMock(side_effect=RuntimeError("spark unavailable")) + register = AsyncMock(side_effect=RuntimeError("authz unavailable")) agent = _agent() - service, temporal_adapter, _, _ = _build_service( + service, temporal_adapter, authorization_service = _build_service( principal=_principal(user_id="user-A"), register_resource=register, ) - with pytest.raises(RuntimeError, match="spark unavailable"): + with pytest.raises(RuntimeError, match="authz unavailable"): await service.create_schedule(agent, _request()) - # Fail-closed: the Temporal create never runs when register fails. temporal_adapter.create_schedule.assert_not_awaited() + authorization_service.deregister_resource.assert_not_awaited() @pytest.mark.asyncio @@ -183,7 +175,7 @@ async def test_create_schedule_temporal_failure_triggers_compensating_deregister ): agent = _agent() request = _request("nightly") - service, _, register, deregister = _build_service( + service, _, authorization_service = _build_service( principal=_principal(user_id="user-A"), create_raises=RuntimeError("temporal down"), ) @@ -191,11 +183,12 @@ async def test_create_schedule_temporal_failure_triggers_compensating_deregister with pytest.raises(RuntimeError, match="temporal down"): await service.create_schedule(agent, request) - register.assert_awaited_once() - # Orphan-tuple guard: the tuple was registered but the Temporal create - # failed, so the tuple is compensated away. - deregister.assert_awaited_once() - compensated: AgentexResource = deregister.await_args.kwargs["resource"] + authorization_service.register_resource.assert_awaited_once() + # The schedule never landed in Temporal, so the auth entry is cleaned up. + authorization_service.deregister_resource.assert_awaited_once() + compensated: AgentexResource = ( + authorization_service.deregister_resource.await_args.kwargs["resource"] + ) assert compensated.type == AgentexResourceType.schedule assert compensated.selector == build_schedule_id(agent.id, request.name) @@ -203,11 +196,11 @@ async def test_create_schedule_temporal_failure_triggers_compensating_deregister @pytest.mark.asyncio @pytest.mark.integration async def test_create_schedule_readback_failure_does_not_compensate() -> None: - # The Temporal create SUCCEEDS but the post-create describe read-back fails. - # The schedule genuinely exists, so its tuple must NOT be compensated away — - # deregistering here would fail-close the owner out of their own schedule. + # The Temporal create succeeded but the post-create describe failed. The + # schedule genuinely exists, so the auth entry must survive the read-back + # error. agent = _agent() - service, temporal_adapter, register, deregister = _build_service( + service, temporal_adapter, authorization_service = _build_service( principal=_principal(user_id="user-A"), get_schedule_raises=RuntimeError("describe transient error"), ) @@ -216,42 +209,43 @@ async def test_create_schedule_readback_failure_does_not_compensate() -> None: await service.create_schedule(agent, _request()) temporal_adapter.create_schedule.assert_awaited_once() - register.assert_awaited_once() - deregister.assert_not_awaited() + authorization_service.register_resource.assert_awaited_once() + authorization_service.deregister_resource.assert_not_awaited() @pytest.mark.asyncio @pytest.mark.integration async def test_delete_schedule_deregister_failure_does_not_block_delete() -> None: - deregister = AsyncMock(side_effect=RuntimeError("spark unavailable")) + deregister = AsyncMock(side_effect=RuntimeError("authz unavailable")) agent = _agent() - service, temporal_adapter, _, _ = _build_service( + service, temporal_adapter, authorization_service = _build_service( principal=_principal(user_id="user-A"), deregister_resource=deregister, ) - # Best-effort: the deregister failure is swallowed, the delete completes. + # Best-effort cleanup: a deregister failure is swallowed after Temporal + # delete succeeds. await service.delete_schedule(agent.id, "nightly") temporal_adapter.delete_schedule.assert_awaited_once_with( build_schedule_id(agent.id, "nightly") ) - deregister.assert_awaited_once() + authorization_service.deregister_resource.assert_awaited_once() @pytest.mark.asyncio @pytest.mark.integration -async def test_create_schedule_no_creator_skips_register() -> None: +async def test_create_schedule_no_creator_skips_auth_writes() -> None: agent = _agent() request = _request("nightly") - # Neither user_id nor service_account_id — agent-bypass / internal path. - service, temporal_adapter, register, deregister = _build_service( + # Neither user_id nor service_account_id: internal paths still create the + # schedule, but there is no creator identity to register as owner. + service, temporal_adapter, authorization_service = _build_service( principal=_principal(user_id=None, service_account_id=None), ) await service.create_schedule(agent, request) - register.assert_not_awaited() - deregister.assert_not_awaited() - # The schedule is still created — the dual-write is a no-op, not a block. + authorization_service.register_resource.assert_not_awaited() + authorization_service.deregister_resource.assert_not_awaited() temporal_adapter.create_schedule.assert_awaited_once() diff --git a/agentex/tests/integration/use_cases/test_agent_authz_dual_write.py b/agentex/tests/integration/use_cases/test_agent_authz_dual_write.py new file mode 100644 index 00000000..00a1b0e0 --- /dev/null +++ b/agentex/tests/integration/use_cases/test_agent_authz_dual_write.py @@ -0,0 +1,418 @@ +"""Integration tests for agent authorization-graph lifecycle writes. + +- ``register_agent`` / ``register_build`` ``register_resource`` the agent (with no + parent — an agent's tenant edge is server-derived) *before* persisting the row, + but only on a genuine create. The update / name-already-exists paths must not + register, or the owner would be rewritten to the current caller. +- If the persist fails (or loses a duplicate race) after a successful register, + the create issues a compensating ``deregister_resource`` and re-raises / adopts + the existing row. Explicit ownership grant/revoke calls are route-level behavior. +- If registration was skipped because no creator was resolvable, failed creates + do not issue a compensating deregister for a resource that was never registered. +- ``delete`` ``deregister_resource``s *after* the soft-delete, best-effort: a + failure is swallowed so a delete that already succeeded never surfaces an error. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock +from uuid import uuid4 + +import pytest +from src.adapters.crud_store.exceptions import DuplicateItemError, ItemDoesNotExist +from src.api.schemas.authorization_types import AgentexResource, AgentexResourceType +from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus +from src.domain.use_cases.agents_use_case import AgentsUseCase + +from tests.fixtures.services import make_noop_authorization_service + + +def _principal( + user_id: str | None = "user-A", service_account_id: str | None = None +) -> SimpleNamespace: + """Minimal stand-in for the auth principal context.""" + return SimpleNamespace(user_id=user_id, service_account_id=service_account_id) + + +def _build_use_case( + *, + agent_repository, + deployment_history_repository=None, + deployment_repository=None, + principal: SimpleNamespace | None = None, +) -> tuple[AgentsUseCase, Mock]: + authorization_service = make_noop_authorization_service() + # Resolvable creator by default so the create path registers; tests that + # exercise the no-creator guard pass a principal with no user/service account. + authorization_service.principal_context = ( + principal if principal is not None else _principal() + ) + use_case = AgentsUseCase( + agent_repository=agent_repository, + deployment_history_repository=deployment_history_repository or AsyncMock(), + deployment_repository=deployment_repository or AsyncMock(), + temporal_adapter=AsyncMock(), + authorization_service=authorization_service, + ) + return use_case, authorization_service + + +def _existing_agent(name: str) -> AgentEntity: + return AgentEntity( + id=str(uuid4()), + name=name, + description="existing agent", + status=AgentStatus.READY, + acp_type=ACPType.ASYNC, + acp_url="http://existing-acp", + ) + + +async def _agent_exists(agent_repository, agent_id: str) -> bool: + try: + await agent_repository.get(id=agent_id) + return True + except ItemDoesNotExist: + return False + + +@pytest.mark.integration +@pytest.mark.asyncio +class TestAgentRegisterOnCreate: + async def test_create_registers_before_persist_with_no_parent( + self, isolated_repositories + ): + agent_repo = isolated_repositories["agent_repository"] + use_case, authorization_service = _build_use_case( + agent_repository=agent_repo, + deployment_history_repository=isolated_repositories[ + "deployment_history_repository" + ], + deployment_repository=isolated_repositories["deployment_repository"], + ) + + # When register fires, the Postgres row must not exist yet — this is + # what makes a registration failure abort the request cleanly. + observed = {} + + async def _record_existence(resource, parent=None): + observed["row_exists_at_register"] = await _agent_exists( + agent_repo, resource.selector + ) + + authorization_service.register_resource.side_effect = _record_existence + + agent = await use_case.register_agent( + name=f"dw-create-{uuid4().hex[:8]}", + description="created via ownership-write test", + acp_url="http://new-acp", + ) + + assert observed["row_exists_at_register"] is False + assert await _agent_exists(agent_repo, agent.id) is True + + authorization_service.register_resource.assert_awaited_once() + call = authorization_service.register_resource.call_args + registered_resource: AgentexResource = call.args[0] + assert registered_resource.type == AgentexResourceType.agent + assert registered_resource.selector == agent.id + # An agent's tenant edge is server-derived from the account; no parent. + assert call.kwargs.get("parent") is None + assert len(call.args) == 1 + # Ownership is granted by the route after create succeeds. + authorization_service.grant.assert_not_awaited() + + async def test_register_failure_aborts_create_with_no_row( + self, isolated_repositories + ): + agent_repo = isolated_repositories["agent_repository"] + use_case, authorization_service = _build_use_case( + agent_repository=agent_repo, + deployment_history_repository=isolated_repositories[ + "deployment_history_repository" + ], + deployment_repository=isolated_repositories["deployment_repository"], + ) + authorization_service.register_resource.side_effect = RuntimeError("authz down") + + name = f"dw-register-fail-{uuid4().hex[:8]}" + with pytest.raises(RuntimeError): + await use_case.register_agent( + name=name, + description="should never persist", + acp_url="http://new-acp", + ) + + # No compensation needed: the persist never ran, so no row exists. + with pytest.raises(ItemDoesNotExist): + await agent_repo.get(name=name) + authorization_service.deregister_resource.assert_not_awaited() + + async def test_persist_failure_compensates_and_surfaces_original_error(self): + # Register succeeds, then the Postgres persist blows up. The create must + # deregister the just-registered agent (no orphan) and re-raise the + # ORIGINAL persist error, not any deregister error. + agent_repo = Mock() + agent_repo.get = AsyncMock(side_effect=ItemDoesNotExist("absent")) + agent_repo.create = AsyncMock(side_effect=RuntimeError("db down")) + use_case, authorization_service = _build_use_case(agent_repository=agent_repo) + + with pytest.raises(RuntimeError, match="db down"): + await use_case.register_agent( + name=f"dw-persist-fail-{uuid4().hex[:8]}", + description="persist fails", + acp_url="http://new-acp", + ) + + authorization_service.register_resource.assert_awaited_once() + # deregister compensates the register; ownership writes are route-level. + authorization_service.deregister_resource.assert_awaited_once() + authorization_service.grant.assert_not_awaited() + authorization_service.revoke.assert_not_awaited() + registered = authorization_service.register_resource.call_args.args[0] + compensated = authorization_service.deregister_resource.call_args.args[0] + assert compensated.type == AgentexResourceType.agent + assert compensated.selector == registered.selector + + async def test_duplicate_compensates_then_adopts_existing_row(self): + # A parallel writer wins the create. The use case deregisters its own + # registration and adopts the already-persisted row. + existing = _existing_agent(f"dw-dup-{uuid4().hex[:8]}") + agent_repo = Mock() + agent_repo.get = AsyncMock(side_effect=[ItemDoesNotExist("absent"), existing]) + agent_repo.create = AsyncMock(side_effect=DuplicateItemError("exists")) + use_case, authorization_service = _build_use_case(agent_repository=agent_repo) + + result = await use_case.register_agent( + name=existing.name, + description="dup create", + acp_url="http://new-acp", + ) + + assert result.id == existing.id + authorization_service.register_resource.assert_awaited_once() + authorization_service.deregister_resource.assert_awaited_once() + authorization_service.grant.assert_not_awaited() + authorization_service.revoke.assert_not_awaited() + registered = authorization_service.register_resource.call_args.args[0] + compensated = authorization_service.deregister_resource.call_args.args[0] + assert compensated.selector == registered.selector + + async def test_update_path_does_not_register(self): + # register_agent called with an agent_id is an update, not a create: it + # must NOT register, or ownership would be rewritten to the caller. + existing = _existing_agent(f"dw-update-{uuid4().hex[:8]}") + agent_repo = Mock() + agent_repo.get = AsyncMock(return_value=existing) + agent_repo.update = AsyncMock(return_value=existing) + use_case, authorization_service = _build_use_case(agent_repository=agent_repo) + + await use_case.register_agent( + name=existing.name, + description="updated description", + acp_url="http://updated-acp", + agent_id=existing.id, + ) + + authorization_service.register_resource.assert_not_awaited() + authorization_service.grant.assert_not_awaited() + + async def test_create_without_resolvable_creator_skips_register( + self, isolated_repositories + ): + # With no user or service account on the principal there is no owner to + # attribute, so the create persists the row but skips registration. + agent_repo = isolated_repositories["agent_repository"] + use_case, authorization_service = _build_use_case( + agent_repository=agent_repo, + deployment_history_repository=isolated_repositories[ + "deployment_history_repository" + ], + deployment_repository=isolated_repositories["deployment_repository"], + principal=_principal(user_id=None, service_account_id=None), + ) + + agent = await use_case.register_agent( + name=f"dw-no-creator-{uuid4().hex[:8]}", + description="no resolvable creator", + acp_url="http://new-acp", + ) + + authorization_service.register_resource.assert_not_awaited() + authorization_service.grant.assert_not_awaited() + assert await _agent_exists(agent_repo, agent.id) is True + + async def test_persist_failure_without_resolvable_creator_skips_compensation(self): + agent_repo = Mock() + agent_repo.get = AsyncMock(side_effect=ItemDoesNotExist("absent")) + agent_repo.create = AsyncMock(side_effect=RuntimeError("db down")) + use_case, authorization_service = _build_use_case( + agent_repository=agent_repo, + principal=_principal(user_id=None, service_account_id=None), + ) + + with pytest.raises(RuntimeError, match="db down"): + await use_case.register_agent( + name=f"dw-no-creator-fail-{uuid4().hex[:8]}", + description="persist fails after skipped register", + acp_url="http://new-acp", + ) + + authorization_service.register_resource.assert_not_awaited() + authorization_service.deregister_resource.assert_not_awaited() + authorization_service.grant.assert_not_awaited() + authorization_service.revoke.assert_not_awaited() + + +@pytest.mark.integration +@pytest.mark.asyncio +class TestAgentRegisterBuildOnCreate: + async def test_build_persist_failure_compensates_and_surfaces_original_error(self): + agent_repo = Mock() + agent_repo.get = AsyncMock(side_effect=ItemDoesNotExist("absent")) + agent_repo.create = AsyncMock(side_effect=RuntimeError("db down")) + use_case, authorization_service = _build_use_case(agent_repository=agent_repo) + + with pytest.raises(RuntimeError, match="db down"): + await use_case.register_build( + name=f"dw-build-persist-fail-{uuid4().hex[:8]}", + description="build persist fails", + ) + + authorization_service.register_resource.assert_awaited_once() + authorization_service.deregister_resource.assert_awaited_once() + authorization_service.grant.assert_not_awaited() + authorization_service.revoke.assert_not_awaited() + registered = authorization_service.register_resource.call_args.args[0] + compensated = authorization_service.deregister_resource.call_args.args[0] + assert compensated.type == AgentexResourceType.agent + assert compensated.selector == registered.selector + + async def test_build_duplicate_compensates_then_adopts_existing_row(self): + existing = _existing_agent(f"dw-build-dup-{uuid4().hex[:8]}") + agent_repo = Mock() + agent_repo.get = AsyncMock(side_effect=[ItemDoesNotExist("absent"), existing]) + agent_repo.create = AsyncMock(side_effect=DuplicateItemError("exists")) + use_case, authorization_service = _build_use_case(agent_repository=agent_repo) + + result = await use_case.register_build( + name=existing.name, + description="dup build create", + ) + + assert result.id == existing.id + authorization_service.register_resource.assert_awaited_once() + authorization_service.deregister_resource.assert_awaited_once() + authorization_service.grant.assert_not_awaited() + authorization_service.revoke.assert_not_awaited() + registered = authorization_service.register_resource.call_args.args[0] + compensated = authorization_service.deregister_resource.call_args.args[0] + assert compensated.selector == registered.selector + + async def test_build_without_resolvable_creator_skips_register( + self, isolated_repositories + ): + agent_repo = isolated_repositories["agent_repository"] + use_case, authorization_service = _build_use_case( + agent_repository=agent_repo, + deployment_history_repository=isolated_repositories[ + "deployment_history_repository" + ], + deployment_repository=isolated_repositories["deployment_repository"], + principal=_principal(user_id=None, service_account_id=None), + ) + + agent = await use_case.register_build( + name=f"dw-build-no-creator-{uuid4().hex[:8]}", + description="build no resolvable creator", + ) + + authorization_service.register_resource.assert_not_awaited() + authorization_service.grant.assert_not_awaited() + assert await _agent_exists(agent_repo, agent.id) is True + + async def test_build_persist_failure_without_resolvable_creator_skips_compensation( + self, + ): + agent_repo = Mock() + agent_repo.get = AsyncMock(side_effect=ItemDoesNotExist("absent")) + agent_repo.create = AsyncMock(side_effect=RuntimeError("db down")) + use_case, authorization_service = _build_use_case( + agent_repository=agent_repo, + principal=_principal(user_id=None, service_account_id=None), + ) + + with pytest.raises(RuntimeError, match="db down"): + await use_case.register_build( + name=f"dw-build-no-creator-fail-{uuid4().hex[:8]}", + description="build persist fails after skipped register", + ) + + authorization_service.register_resource.assert_not_awaited() + authorization_service.deregister_resource.assert_not_awaited() + authorization_service.grant.assert_not_awaited() + authorization_service.revoke.assert_not_awaited() + + +@pytest.mark.integration +@pytest.mark.asyncio +class TestAgentDeregisterOnDelete: + async def test_delete_deregisters_after_soft_delete(self, isolated_repositories): + agent_repo = isolated_repositories["agent_repository"] + use_case, authorization_service = _build_use_case( + agent_repository=agent_repo, + deployment_history_repository=isolated_repositories[ + "deployment_history_repository" + ], + deployment_repository=isolated_repositories["deployment_repository"], + ) + agent = await use_case.register_agent( + name=f"dw-del-{uuid4().hex[:8]}", + description="to be deleted", + acp_url="http://new-acp", + ) + authorization_service.deregister_resource.reset_mock() + + deleted = await use_case.delete(id=agent.id) + + assert deleted.status == AgentStatus.DELETED + persisted = await agent_repo.get(id=agent.id) + assert persisted.status == AgentStatus.DELETED + authorization_service.deregister_resource.assert_awaited_once() + deregistered: AgentexResource = ( + authorization_service.deregister_resource.call_args.args[0] + ) + assert deregistered.type == AgentexResourceType.agent + assert deregistered.selector == agent.id + # Ownership is revoked by the route after delete succeeds. + authorization_service.revoke.assert_not_awaited() + + async def test_delete_swallows_deregister_failure(self, isolated_repositories): + # A deregister failure after a successful soft-delete must not surface: + # Postgres is the source of truth for existence. + agent_repo = isolated_repositories["agent_repository"] + use_case, authorization_service = _build_use_case( + agent_repository=agent_repo, + deployment_history_repository=isolated_repositories[ + "deployment_history_repository" + ], + deployment_repository=isolated_repositories["deployment_repository"], + ) + agent = await use_case.register_agent( + name=f"dw-del-fail-{uuid4().hex[:8]}", + description="delete with failing deregister", + acp_url="http://new-acp", + ) + authorization_service.deregister_resource.reset_mock() + authorization_service.deregister_resource.side_effect = RuntimeError( + "authz down" + ) + + # Must not raise despite the deregister failure. + deleted = await use_case.delete(id=agent.id) + + assert deleted.status == AgentStatus.DELETED + persisted = await agent_repo.get(id=agent.id) + assert persisted.status == AgentStatus.DELETED + authorization_service.deregister_resource.assert_awaited_once() diff --git a/agentex/tests/unit/api/test_agents_register_build_authz.py b/agentex/tests/unit/api/test_agents_register_build_authz.py index ab0b5807..6ef53f60 100644 --- a/agentex/tests/unit/api/test_agents_register_build_authz.py +++ b/agentex/tests/unit/api/test_agents_register_build_authz.py @@ -26,25 +26,19 @@ def _agent() -> AgentEntity: @pytest.mark.unit @pytest.mark.asyncio -async def test_register_build_registers_agent_resource() -> None: - """Build-time agent creation must call register_resource, not grant-only.""" +async def test_register_build_grants_legacy_ownership_after_create() -> None: + """Build-time agent creation gates on authz and grants legacy ownership.""" agents_use_case = MagicMock() agents_use_case.register_build = AsyncMock(return_value=_agent()) authorization_service = MagicMock() authorization_service.check = AsyncMock(return_value=True) authorization_service.register_resource = AsyncMock(return_value=None) authorization_service.grant = AsyncMock(return_value=None) - principal_context = { - "account_id": "account-123", - "user_id": "user-123", - "api_key": "test-key", - } result = await register_build( request=RegisterBuildRequest( name="build-agent", description="Created from build", - principal_context=principal_context, ), agents_use_case=agents_use_case, authorization_service=authorization_service, @@ -54,7 +48,6 @@ async def test_register_build_registers_agent_resource() -> None: authorization_service.check.assert_awaited_once_with( AgentexResource.agent("*"), AuthorizedOperationType.create, - principal_context=principal_context, ) agents_use_case.register_build.assert_awaited_once_with( name="build-agent", @@ -62,22 +55,16 @@ async def test_register_build_registers_agent_resource() -> None: registration_metadata=None, agent_input_type=None, ) - authorization_service.register_resource.assert_awaited_once_with( + authorization_service.register_resource.assert_not_awaited() + authorization_service.grant.assert_awaited_once_with( AgentexResource.agent("agent-123"), - principal_context=principal_context, ) - authorization_service.grant.assert_not_awaited() @pytest.mark.unit @pytest.mark.asyncio -async def test_register_build_existing_agent_path_is_reentrant() -> None: - """Idempotent-by-name path returns an existing agent and re-registers safely. - - The auth layer owns duplicate-resource idempotency; the route must keep - using the resource-registration path for the returned agent instead of - falling back to grant-only behavior. - """ +async def test_register_build_existing_agent_path_stays_route_reentrant() -> None: + """Idempotent-by-name path still grants legacy ownership at the route.""" existing_agent = _agent() agents_use_case = MagicMock() agents_use_case.register_build = AsyncMock(return_value=existing_agent) @@ -85,15 +72,9 @@ async def test_register_build_existing_agent_path_is_reentrant() -> None: authorization_service.check = AsyncMock(return_value=True) authorization_service.register_resource = AsyncMock(return_value=None) authorization_service.grant = AsyncMock(return_value=None) - principal_context = { - "account_id": "account-123", - "user_id": "user-123", - "api_key": "test-key", - } request = RegisterBuildRequest( name="build-agent", description="Created from build", - principal_context=principal_context, ) first = await register_build( @@ -110,8 +91,15 @@ async def test_register_build_existing_agent_path_is_reentrant() -> None: assert first.id == existing_agent.id assert second.id == existing_agent.id assert agents_use_case.register_build.await_count == 2 - assert authorization_service.register_resource.await_count == 2 - for call in authorization_service.register_resource.await_args_list: - assert call.args == (AgentexResource.agent(existing_agent.id),) - assert call.kwargs == {"principal_context": principal_context} - authorization_service.grant.assert_not_awaited() + assert authorization_service.check.await_count == 2 + for call in authorization_service.check.await_args_list: + assert call.args == ( + AgentexResource.agent("*"), + AuthorizedOperationType.create, + ) + assert call.kwargs == {} + authorization_service.register_resource.assert_not_awaited() + assert authorization_service.grant.await_count == 2 + for call in authorization_service.grant.await_args_list: + assert call.args == (AgentexResource.agent("agent-123"),) + assert call.kwargs == {} diff --git a/agentex/tests/unit/api/test_tasks_authz.py b/agentex/tests/unit/api/test_tasks_authz.py index 47926413..7f497734 100644 --- a/agentex/tests/unit/api/test_tasks_authz.py +++ b/agentex/tests/unit/api/test_tasks_authz.py @@ -282,6 +282,48 @@ async def test_update_check_denial_collapses_to_404(self): ) +@pytest.mark.unit +@pytest.mark.asyncio +class TestTaskDeleteAuthWrites: + async def test_delete_task_revokes_legacy_ownership_after_delete(self): + from src.api.routes.tasks import delete_task + + task_use_case = MagicMock() + task_use_case.delete_task = AsyncMock(return_value=None) + authorization = MagicMock() + authorization.revoke = AsyncMock(return_value=None) + + result = await delete_task( + task_id="task-1", + task_use_case=task_use_case, + authorization=authorization, + ) + + task_use_case.delete_task.assert_awaited_once_with(id="task-1") + authorization.revoke.assert_awaited_once_with(AgentexResource.task("task-1")) + assert result.id == "task-1" + + async def test_delete_task_by_name_revokes_resolved_task_after_delete(self): + from src.api.routes.tasks import delete_task_by_name + + task_use_case = MagicMock() + task_use_case.get_task = AsyncMock(return_value=MagicMock(id="task-2")) + task_use_case.delete_task = AsyncMock(return_value=None) + authorization = MagicMock() + authorization.revoke = AsyncMock(return_value=None) + + result = await delete_task_by_name( + task_name="named-task", + task_use_case=task_use_case, + authorization=authorization, + ) + + task_use_case.get_task.assert_awaited_once_with(name="named-task") + task_use_case.delete_task.assert_awaited_once_with(name="named-task") + authorization.revoke.assert_awaited_once_with(AgentexResource.task("task-2")) + assert result.id == "task-2" + + @pytest.mark.unit @pytest.mark.asyncio class TestCheckTaskOrCollapseTo404: diff --git a/agentex/tests/unit/services/test_schedule_service.py b/agentex/tests/unit/services/test_schedule_service.py index 117ed9d7..0e0f4a07 100644 --- a/agentex/tests/unit/services/test_schedule_service.py +++ b/agentex/tests/unit/services/test_schedule_service.py @@ -51,8 +51,7 @@ def mock_temporal_adapter(): @pytest.fixture def mock_authorization_service(): - """Mock authorization service with a resolvable creator principal so the - dual-write register/deregister calls run as no-ops in these unit tests.""" + """Mock authorization service with a resolvable creator principal.""" mock = Mock() mock.principal_context = SimpleNamespace( user_id="user-test", service_account_id=None, account_id="acct-test" diff --git a/agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py b/agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py index e6d13949..e3cb7faf 100644 --- a/agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py +++ b/agentex/tests/unit/use_cases/test_acp_type_backwards_compatibility_use_case.py @@ -250,6 +250,7 @@ async def test_register_agent_defaults_to_async_not_agentic(self): deployment_repository=deployment_repo, # Not testing temporal adapter in this test temporal_adapter=TemporalAdapter(), + authorization_service=make_noop_authorization_service(), ) # Mock repository to return a new agent @@ -292,6 +293,7 @@ async def test_can_explicitly_register_agentic_agent(self): deployment_repository=deployment_repo, # Not testing temporal adapter in this test temporal_adapter=TemporalAdapter(), + authorization_service=make_noop_authorization_service(), ) # Mock repository to return an AGENTIC agent diff --git a/agentex/tests/unit/use_cases/test_agents_use_case.py b/agentex/tests/unit/use_cases/test_agents_use_case.py index ca78390a..6dda4e0c 100644 --- a/agentex/tests/unit/use_cases/test_agents_use_case.py +++ b/agentex/tests/unit/use_cases/test_agents_use_case.py @@ -11,6 +11,7 @@ ) from src.domain.repositories.deployment_repository import DeploymentRepository from src.domain.use_cases.agents_use_case import AgentsUseCase +from tests.fixtures.services import make_noop_authorization_service @pytest.fixture @@ -45,6 +46,7 @@ def agents_use_case( deployment_history_repository=deployment_history_repository, deployment_repository=deployment_repository, temporal_adapter=temporal_adapter, + authorization_service=make_noop_authorization_service(), )