diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index 100fb9b8..caa12c7f 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -6660,6 +6660,13 @@ components: type: object - type: 'null' title: If provided, replaces task_metadata with this value + merge_params: + anyOf: + - additionalProperties: true + type: object + - type: 'null' + title: Optional shallow-merge patch applied to the task's params column. + Top-level keys overwrite; pass full nested objects to change subfields. type: object title: UpdateTaskRequest ValidationError: diff --git a/agentex/src/api/routes/tasks.py b/agentex/src/api/routes/tasks.py index a2d30227..a8ae9642 100644 --- a/agentex/src/api/routes/tasks.py +++ b/agentex/src/api/routes/tasks.py @@ -193,7 +193,9 @@ async def update_task( task_use_case: DTaskUseCase, ) -> Task: updated_task_entity = await task_use_case.update_mutable_fields_on_task( - id=task_id, task_metadata=request.task_metadata + id=task_id, + task_metadata=request.task_metadata, + merge_params=request.merge_params, ) return Task.model_validate(updated_task_entity) diff --git a/agentex/src/api/schemas/tasks.py b/agentex/src/api/schemas/tasks.py index 9a8a53dd..74c8d83d 100644 --- a/agentex/src/api/schemas/tasks.py +++ b/agentex/src/api/schemas/tasks.py @@ -77,6 +77,14 @@ class UpdateTaskRequest(BaseModel): None, title="If provided, replaces task_metadata with this value", ) + merge_params: dict[str, Any] | None = Field( + None, + title=( + "Optional shallow-merge patch applied to the task's params column. " + "Top-level keys overwrite; pass full nested objects to change " + "subfields." + ), + ) class TaskStatusReasonRequest(BaseModel): diff --git a/agentex/src/domain/repositories/task_repository.py b/agentex/src/domain/repositories/task_repository.py index 231c0e9c..45662015 100644 --- a/agentex/src/domain/repositories/task_repository.py +++ b/agentex/src/domain/repositories/task_repository.py @@ -3,7 +3,8 @@ from typing import Annotated, Literal from fastapi import Depends -from sqlalchemy import distinct, func, select, update +from sqlalchemy import cast, distinct, func, select, update +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import selectinload from src.adapters.crud_store.adapter_postgres import ( ColumnPrimitiveValue, @@ -213,6 +214,46 @@ async def update(self, task: TaskEntity) -> TaskEntity: # Return with agents populated return TaskEntity.model_validate(modified_orm) + async def merge_params(self, task_id: str, patch: dict) -> TaskEntity | None: + """Atomically shallow-merge ``patch`` into the task's ``params`` + JSONB column. Returns the updated entity, or ``None`` if no task + with that id exists. + + Uses Postgres' ``||`` operator on JSONB so the read-modify-write + is collapsed to a single statement (no race with concurrent + writers). ``patch`` keys overwrite existing keys at the top + level; nested objects are NOT deep-merged — pass the full nested + replacement if you need to change a subfield. + + Intended for surfaces that need ``tasks.params`` to reflect the + latest agent config after a live edit (see + ``TasksUseCase.update_mutable_fields_on_task``); not a + general-purpose updater. + """ + + async with ( + self.start_async_db_session(True) as session, + async_sql_exception_handler(), + ): + # ``COALESCE(params, '{}'::jsonb)`` so a NULL existing value + # doesn't poison the concat to NULL. Both operands cast to + # JSONB explicitly so Postgres picks the JSONB ``||`` operator + # (not the text concat overload). + existing = func.coalesce(TaskORM.params, cast({}, JSONB)) + merged = existing.op("||", return_type=JSONB)(cast(patch, JSONB)) + stmt = ( + update(TaskORM) + .where(TaskORM.id == task_id) + .values(params=merged) + .returning(TaskORM) + ) + result = await session.execute(stmt) + row = result.scalar_one_or_none() + await session.commit() + if row is None: + return None + return TaskEntity.model_validate(row) + async def transition_status( self, task_id: str, diff --git a/agentex/src/domain/services/task_service.py b/agentex/src/domain/services/task_service.py index 809dd954..299494b8 100644 --- a/agentex/src/domain/services/task_service.py +++ b/agentex/src/domain/services/task_service.py @@ -242,6 +242,15 @@ async def update_task(self, task: TaskEntity) -> TaskEntity: return updated_task + async def merge_task_params(self, task_id: str, patch: dict) -> TaskEntity | None: + """Atomically shallow-merge ``patch`` into ``tasks.params``. Returns + the updated entity, or ``None`` if no task with ``task_id`` exists. + + Lets callers update agent config on the task row in place; the agent + picks up the new values when it next reads ``task.params``. + """ + return await self.task_repository.merge_params(task_id, patch) + async def delete_task(self, id: str | None = None, name: str | None = None) -> None: """ Delete a task from the repository. diff --git a/agentex/src/domain/use_cases/tasks_use_case.py b/agentex/src/domain/use_cases/tasks_use_case.py index 4ad1a61a..dffc6ec3 100644 --- a/agentex/src/domain/use_cases/tasks_use_case.py +++ b/agentex/src/domain/use_cases/tasks_use_case.py @@ -94,6 +94,7 @@ async def update_mutable_fields_on_task( id: str | None = None, name: str | None = None, task_metadata: dict[str, Any] | None = None, + merge_params: dict[str, Any] | None = None, ) -> TaskEntity: """Update mutable fields on a task entity. This is used by our API since not all fields should be mutable.""" @@ -108,15 +109,28 @@ async def update_mutable_fields_on_task( else: raise ItemDoesNotExist(f"Task {name} not found") - # if no mutations are provided, don't do anything - if task_metadata is None: + # No-op if neither field was supplied. + if task_metadata is None and merge_params is None: return task_entity if task_metadata is not None: task_entity.task_metadata = task_metadata - updated_task_entity = await self.task_service.update_task(task=task_entity) - return updated_task_entity + # `merge_params` is a separate atomic JSONB shallow-merge so concurrent + # callers don't overwrite each other's fields (vs reading→mutating→writing + # the whole params dict on task_entity). Falls through to a normal save + # only when task_metadata also changed. + if merge_params: + merged = await self.task_service.merge_task_params( + task_entity.id, merge_params + ) + if merged is not None: + task_entity = merged + + if task_metadata is not None: + task_entity = await self.task_service.update_task(task=task_entity) + + return task_entity async def _transition_to_terminal( self,