Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions agentex/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5221,12 +5221,6 @@ components:
acp_type:
$ref: '#/components/schemas/ACPType'
description: The type of ACP to use for the agent.
principal_context:
anyOf:
- {}
- type: 'null'
title: Principal Context
description: Principal used for authorization
registration_metadata:
anyOf:
- additionalProperties: true
Expand Down Expand Up @@ -5336,12 +5330,6 @@ components:
type: string
title: Description
description: The description of the agent.
principal_context:
anyOf:
- {}
- type: 'null'
title: Principal Context
description: Principal used for authorization
registration_metadata:
anyOf:
- additionalProperties: true
Expand Down
15 changes: 2 additions & 13 deletions agentex/src/api/routes/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,9 @@ async def get_agent_by_name(
AgentexResourceType.agent, AuthorizedOperationType.read
),
agents_use_case: DAgentsUseCase,
authorization: DAuthorizationService,
):
"""Get an agent by its unique name."""
agent_entity = await agents_use_case.get(name=agent_name)

await authorization.check(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the authorization.check/revoke/grant. Otherwise, how can we maintain backward compatibility?

1, removing authorization.check will remove authz altogether.
2, removing grant/revoke will break the old authz even if the user opts out of the new FGAC

Even if a user turns FGAC on, we may still want to keep grant/revoke for permission data dual-write, so that the old authz still works in case the user turns FGAC off.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went and traced this through agentex-auth to make sure we're solid, and I don't think we need to keep grant/revoke here

tl;dr: on the legacy (SGP) backend, register_resource is grant and deregister_resource is revoke. so dropping the explicit grant/revoke and going through register/deregister is behavior-preserving for old authz, not a regression.

1. removing authorization.check removes authz altogether

it doesn't. the only check I removed is the explicit one in get_agent_by_name, but that route still has agent_name: DAuthorizedName(agent, read) in its signature, and that dependency calls authorization.check(agent, read) itself before the handler body runs (authorization_shortcuts.py). so the line I deleted was a redundant second check of the same tuple. read authz is still enforced. on create I kept the check(agent("*"), create) too, just dropped the unused principal_context= kwarg.

2 + 3. removing grant/revoke breaks old authz when FGAC is off

two things:

  • agentex-auth routes every call (grant/revoke/check/register/deregister) to exactly one provider per account based on FGAC_AGENTEX_AUTH_SPARK (_resolve_provider). flag off → legacy SGP, flag on → Spark. there's no write-to-both happening, it's a routing switch.
  • on the SGP gateway, register_resource delegates straight to grant and deregister_resource delegates to revoke

so for an FGAC-off account, register_resource(agent) → SGP → grant(owner), which is exactly what the old route did. deregister_resourcerevoke. legacy authz is fully preserved. for FGAC-on it does the proper Spark resource+owner write (tenant scoping comes from account_id automatically). either way it's correct, and keeping grant/revoke on top would just be a redundant duplicate write.

resource=AgentexResource.agent(agent_entity.id),
operation=AuthorizedOperationType.read,
)

Comment on lines -84 to -88

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing this? Does it now happen at a higher layer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's moved up into the parameter dependency, so the body call was a duplicate. agent_name is typed DAuthorizedName(AgentexResourceType.agent, read), and that dependency (_ensure_authorized_name in src/utils/authorization_shortcuts.py) resolves the agent by name + runs authorization.check(agent(resolved_id), read) before the handler body executes.

get_agent_by_id directly above already works this way

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was duplicative even before this PR.

return Agent.model_validate(agent_entity)


Expand Down Expand Up @@ -183,7 +176,6 @@ async def register_agent(
await authorization_service.check(
AgentexResource.agent("*"),
AuthorizedOperationType.create,
principal_context=request.principal_context,
)
logger.info(f"Registering agent: {request}")
try:
Expand All @@ -198,7 +190,6 @@ async def register_agent(
)
await authorization_service.grant(
AgentexResource.agent(agent_entity.id),
principal_context=request.principal_context,
)
response_fields = agent_entity.model_dump()
existing_key = await api_keys_use_case.get_internal_api_key_by_agent_id(
Expand Down Expand Up @@ -236,11 +227,10 @@ async def register_build(
agents_use_case: DAgentsUseCase,
authorization_service: DAuthorizationService,
) -> Agent:
"""Create a build-only agent row and register its authz resource."""
"""Create a build-only agent row after create authorization."""
await authorization_service.check(
AgentexResource.agent("*"),
AuthorizedOperationType.create,
principal_context=request.principal_context,
)
logger.info(f"Registering build for agent: {request.name}")
try:
Expand All @@ -252,9 +242,8 @@ async def register_build(
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
await authorization_service.register_resource(
await authorization_service.grant(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we do a register resource in agentex/src/domain/use_cases/agents_use_case.py and then a grant out here. Do we need to do both? Can we keep the authz checks/grants at the router layer ideally instead of nested in the business logic? Thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need both, they're the two backends of the dual-write, not redundant. register_resource records the agent in the new authz graph; grant records the legacy owner edge.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure makes sense but still can we move it up from the business layer? or move this call down. Ideally all authz checks and writes happen at the highest layer possible IMO

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree on the principle for the checks. gating belongs at the edge and it already is there: the check runs in the route and in the DAuthorizedId / DAuthorizedName param dependencies (which resolve before the handler body executes), so access control happens at the highest layer.

register_resource / deregister_resource are not authz gating. they're resource-graph writes that have to be transactionally consistent with the DB persist:

  • run before the row is persisted (fail-closed → no orphaned row),
  • compensate with a deregister if the persist loses a duplicate-create race or fails,
  • fire only on a genuine create (the get-or-create logic in the use case is what knows create-vs-update; re-registering on an update would rewrite ownership).

The route only gets control after register_agent returns (post-persist), and the agent id is generated inside it. so it can't do register-before-persist or know create-vs-update without exposing the transaction internals upward. Moving it to the router would pull persistence/race logic into the route, which is worse

AgentexResource.agent(agent_entity.id),
principal_context=request.principal_context,
)
return Agent.model_validate(agent_entity)

Expand Down
6 changes: 0 additions & 6 deletions agentex/src/api/schemas/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ class RegisterAgentRequest(BaseModel):
description="Optional agent ID if the agent already exists and needs to be updated.",
)
acp_type: ACPType = Field(..., description="The type of ACP to use for the agent.")
principal_context: Any | None = Field(
default=None, description="Principal used for authorization"
)
registration_metadata: dict[str, Any] | None = Field(
default=None,
description="The metadata for the agent's registration.",
Expand Down Expand Up @@ -120,9 +117,6 @@ class RegisterBuildRequest(BaseModel):
..., pattern=r"^[a-z0-9-]+$", description="The unique name of the agent."
)
description: str = Field(..., description="The description of the agent.")
principal_context: Any | None = Field(
default=None, description="Principal used for authorization"
)
registration_metadata: dict[str, Any] | None = Field(
default=None,
description="The metadata for the agent's build registration.",
Expand Down
52 changes: 26 additions & 26 deletions agentex/src/domain/services/schedule_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ async def create_schedule(
else None
)

# Schedules have no Postgres row Temporal is the store and the auth
# Schedules have no Postgres row: Temporal is the store and the auth
# selector is the schedule id ({agent_id}--{schedule_name}). Register
# the auth tuple (with the parent_agent edge) BEFORE the Temporal write
# (fail-closed), and compensate if the Temporal create fails so we never
# leave an orphan tuple. The read-back below is intentionally OUTSIDE
# the compensation scope: a describe failure must not deregister a
# schedule that was actually created.
# before the Temporal write so an auth failure fails closed. If the
# Temporal create fails after registration, compensate with a deregister.
# The read-back below is intentionally outside the compensation scope
# because the schedule was already created.
registered = await self._register_schedule_in_auth(
schedule_id=schedule_id, agent_id=agent.id
)
Expand All @@ -109,7 +108,7 @@ async def create_schedule(
paused=request.paused,
)
except Exception:
# Orphan-tuple guard: the tuple was written but the schedule never
# Orphan guard: the auth entry was written but the schedule never
# landed in Temporal. Best-effort compensating deregister, then
# re-raise the original error.
if registered:
Expand All @@ -122,17 +121,15 @@ async def create_schedule(
async def _register_schedule_in_auth(
self, *, schedule_id: str, agent_id: str
) -> bool:
"""Register a new agent_schedule with the auth service, including the
parent_agent edge so permissions cascade from the owning agent.
"""Register the schedule in the authorization graph before creating it.

Called BEFORE the Temporal write — a failure raises and prevents the
schedule from being created. Skipped with a warning when no usable
creator identity is available on the principal context (e.g.
agent-bypass / internal paths without an authenticated user); this is
the interim behavior until on-behalf-of-user identity is threaded.
The schedule is registered under its parent agent so permissions
cascade from the owning agent. Registering before the Temporal create
fails closed: an auth failure aborts the create, and the caller
compensates with a deregister if the Temporal create later fails.

Returns True when a tuple was actually registered (so the caller knows
whether a compensating deregister is warranted), False when skipped.
Returns True when the schedule was registered, or False when no creator
identity is resolvable and registration is skipped.
"""
principal_context = self.authorization_service.principal_context
user_id = getattr(principal_context, "user_id", None)
Expand All @@ -149,9 +146,8 @@ async def _register_schedule_in_auth(
parent=AgentexResource.agent(agent_id),
)
except Exception as exc:
# Fail closed: log + re-raise so the schedule is never created.
logger.exception(
"Auth register_resource failed for agent_schedule; aborting create",
"Auth registration failed for agent_schedule; aborting create",
extra={
"schedule_id": schedule_id,
"agent_id": agent_id,
Expand All @@ -162,20 +158,19 @@ async def _register_schedule_in_auth(
return True

async def _deregister_schedule_from_auth(self, *, schedule_id: str) -> None:
"""Best-effort deregistration of a schedule's auth tuples.
"""Best-effort removal of the schedule from the authorization graph.

``deregister_resource`` removes the resource and all of its
relationships (owner, parent, grantees) atomically. Used both on delete
and as the compensating action when a Temporal create fails. Failures
are logged but never propagate.
Temporal is the source of truth for schedule existence. Once Temporal
delete succeeds, a deregister failure is logged but does not block the
delete response.
"""
try:
await self.authorization_service.deregister_resource(
resource=AgentexResource.schedule(schedule_id),
)
except Exception as exc:
logger.warning(
"Auth deregister failed for agent_schedule; tuple may be orphaned",
"Auth deregister failed for agent_schedule; entry may be orphaned",
extra={
"schedule_id": schedule_id,
"error_type": type(exc).__name__,
Expand Down Expand Up @@ -392,7 +387,10 @@ def _description_to_response(
# Decode bytes to string if possible
try:
import json
workflow_params.append(json.loads(arg.data.decode("utf-8")))

workflow_params.append(
json.loads(arg.data.decode("utf-8"))
)
except (json.JSONDecodeError, UnicodeDecodeError):
workflow_params.append(str(arg.data))
else:
Expand Down Expand Up @@ -430,7 +428,9 @@ def _description_to_response(
if hasattr(info, "recent_actions") and info.recent_actions:
# ScheduleActionResult has started_at (when action started) and scheduled_at (when it was scheduled)
last_action = info.recent_actions[-1]
last_action_time = getattr(last_action, "started_at", None) or getattr(last_action, "scheduled_at", None)
last_action_time = getattr(last_action, "started_at", None) or getattr(
last_action, "scheduled_at", None
)
created_at: datetime | None = (
cast(datetime, info.create_time)
if hasattr(info, "create_time") and info.create_time
Expand Down
9 changes: 4 additions & 5 deletions agentex/src/domain/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ async def create_task(
Returns:
Task containing the created task info
"""
# Register in the authorization service before persisting: a registration
# failure aborts the request with no orphaned row. If the persist fails
# after a successful registration, the compensating deregister_resource
# below prevents a dangling authorization entry. Both calls are no-ops
# when the authorization service is disabled for this account.
# Register the task in the authorization graph before persisting: a
# registration failure aborts the request with no orphaned row. If the
# persist fails after a successful registration, the compensating
# deregister_resource below prevents a dangling authorization entry.
task_entity = TaskEntity(
id=orm_id(),
name=task_name,
Expand Down
33 changes: 14 additions & 19 deletions agentex/src/domain/use_cases/agent_api_keys_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,12 @@ async def _register_api_key_in_auth(
api_key_id: str,
agent_id: str,
) -> None:
"""Register a new agent_api_key with the auth service, including the
parent_agent edge so permissions cascade from the owning agent.

Called BEFORE the Postgres write — a failure raises and prevents the
row from being persisted, so there is no compensating action. Skipped
with a warning when no usable creator identity is available on the
principal context (e.g. internal-key creation paths without an
authenticated user).
"""Register a new agent_api_key in the authorization graph before persist.

The api key is registered under its parent agent so permissions cascade
from the owning agent. Registering before the Postgres row is created
fails closed: a failure aborts the create rather than leaving a
persisted api key that cannot be authorized.
"""
principal_context = self.authorization_service.principal_context
user_id = getattr(principal_context, "user_id", None)
Expand All @@ -139,9 +137,8 @@ async def _register_api_key_in_auth(
parent=AgentexResource.agent(agent_id),
)
except Exception as exc:
# Fail closed: log + re-raise so the Postgres row is never written.
logger.exception(
"Auth register_resource failed for agent_api_key; aborting create",
"Auth registration failed for agent_api_key; aborting create",
extra={
"api_key_id": api_key_id,
"agent_id": agent_id,
Expand All @@ -151,21 +148,19 @@ async def _register_api_key_in_auth(
raise

async def _deregister_api_key_from_auth(self, *, api_key_id: str) -> None:
"""Best-effort deregistration of an api_key's auth tuples on delete.
"""Best-effort removal of the api_key from the authorization graph.

``deregister_resource`` removes the resource and all of its
relationships (owner, parent, grantees) atomically. Always invoked;
the authorization service decides how to route the call. Failures are
logged but do not block the delete.
Deletes treat Postgres as the source of truth for existence. Once the
row is gone, a deregister failure is logged but does not block the
delete response.
"""
try:
await self.authorization_service.deregister_resource(
resource=AgentexResource.api_key(api_key_id),
)
except Exception as exc:
# Best-effort: log and continue. Postgres row already deleted.
logger.warning(
"Auth deregister failed for agent_api_key; tuple may be orphaned",
"Auth deregister failed for agent_api_key; entry may be orphaned",
extra={
"api_key_id": api_key_id,
"error_type": type(exc).__name__,
Expand Down Expand Up @@ -203,8 +198,8 @@ async def get_external_by_agent_id_and_key(
)

async def delete(self, id: str) -> None:
# Pre-fetch so we skip both the delete and the deregister when the row
# never existed no DB round-trip, no auth round-trip for a no-op.
# Pre-fetch so we skip both the delete and auth cleanup when the row
# never existed: no DB round-trip, no auth round-trip for a no-op.
try:
await self.agent_api_key_repo.get(id=id)
except ItemDoesNotExist:
Expand Down
15 changes: 6 additions & 9 deletions agentex/src/domain/use_cases/agents_acp_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
AuthenticationServiceUnavailableError,
)
from src.adapters.crud_store.exceptions import ItemDoesNotExist
from src.api.schemas.authorization_types import (
AgentexResource,
)
from src.api.schemas.authorization_types import AgentexResource
from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus
from src.domain.entities.agents_rpc import (
ACP_TYPE_TO_ALLOWED_RPC_METHODS,
Expand Down Expand Up @@ -237,7 +235,7 @@ async def _execute_with_error_handling(
raise e

async def grant_with_retry(self, task: TaskEntity, attempts: int = 0) -> None:
"""Grant authorization for a task with retry"""
"""Grant ownership for a newly created task."""
try:
await self.authorization_service.grant(
resource=AgentexResource.task(task.id),
Expand All @@ -250,11 +248,10 @@ async def grant_with_retry(self, task: TaskEntity, attempts: int = 0) -> None:
)
await asyncio.sleep(delay)
return await self.grant_with_retry(task, attempts + 1)
else:
logger.error(
f"Authentication service unavailable: {e}. Max retries reached."
)
raise e from e
logger.error(
f"Authentication service unavailable: {e}. Max retries reached."
)
raise e from e
except Exception as e:
logger.error(f"Error granting authorization for task {task.id}: {e}")
await self.task_service.fail_task(task, str(e))
Expand Down
Loading
Loading