From aa2eb572a373400b5549ac544c4f9a5febb5717e Mon Sep 17 00:00:00 2001 From: Michael Chou Date: Wed, 10 Jun 2026 16:01:13 -0700 Subject: [PATCH 1/4] update task params in db; send temporal signal to workflow --- agentex/openapi.yaml | 78 +++++++++++++++++++ agentex/src/api/routes/tasks.py | 27 +++++++ agentex/src/api/schemas/tasks.py | 32 ++++++++ .../domain/repositories/task_repository.py | 42 +++++++++- agentex/src/domain/services/task_service.py | 10 +++ .../src/domain/use_cases/tasks_use_case.py | 46 +++++++++++ 6 files changed, 234 insertions(+), 1 deletion(-) diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index b122020e..0ad7f0fb 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -824,6 +824,42 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /tasks/{task_id}/signal: + post: + tags: + - Tasks + summary: Signal Task + description: Dispatch a Temporal signal to the task's workflow. The workflow + must have a `@workflow.signal(name=...)` handler registered for the supplied + `signal_name`; the optional `payload` is forwarded as the signal's single + argument. Returns 4xx if the task isn't currently RUNNING. + operationId: signal_task_tasks__task_id__signal_post + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/SignalTaskRequest' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Task' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /tasks/{task_id}/stream: get: tags: @@ -5638,6 +5674,48 @@ components: required: - content title: SendMessageRequest + SignalTaskRequest: + properties: + signal_name: + type: string + title: Name of the Temporal signal to dispatch + payload: + anyOf: + - additionalProperties: true + type: object + - type: 'null' + title: Optional JSON payload forwarded to the workflow's signal handler + merge_params: + anyOf: + - additionalProperties: true + type: object + - type: 'null' + title: Optional shallow-merge patch applied to the task's params column + after the signal succeeds. Top-level keys overwrite; pass full nested + objects to change subfields. + type: object + required: + - signal_name + title: SignalTaskRequest + description: 'Dispatch a Temporal signal to a running task''s workflow. + + + The workflow must register a matching `@workflow.signal(name=...)` + + handler for the supplied ``signal_name``. ``payload`` is forwarded as + + the signal''s single argument; the workflow is responsible for any + + shape validation. + + + ``merge_params`` is an optional shallow-merge into the task''s stored + + ``params`` JSONB column. Used by live-config flows that want the + + persisted task row to reflect the new config alongside the workflow + + signal (e.g. ConfigModal Save).' Span: properties: id: diff --git a/agentex/src/api/routes/tasks.py b/agentex/src/api/routes/tasks.py index dc557ed3..af196dec 100644 --- a/agentex/src/api/routes/tasks.py +++ b/agentex/src/api/routes/tasks.py @@ -12,6 +12,7 @@ ) from src.api.schemas.delete_response import DeleteResponse from src.api.schemas.tasks import ( + SignalTaskRequest, Task, TaskRelationships, TaskResponse, @@ -302,6 +303,32 @@ async def timeout_task( return Task.model_validate(updated) +@router.post( + "/{task_id}/signal", + response_model=Task, + summary="Signal Task", + description=( + "Dispatch a Temporal signal to the task's workflow. The workflow " + "must have a `@workflow.signal(name=...)` handler registered for " + "the supplied `signal_name`; the optional `payload` is forwarded " + "as the signal's single argument. Returns 4xx if the task isn't " + "currently RUNNING." + ), +) +async def signal_task( + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.update), + request: SignalTaskRequest, + task_use_case: DTaskUseCase, +) -> Task: + updated = await task_use_case.signal_task( + id=task_id, + signal_name=request.signal_name, + payload=request.payload, + merge_params=request.merge_params, + ) + return Task.model_validate(updated) + + @router.get( "/{task_id}/stream", summary="Stream Task Events by ID", diff --git a/agentex/src/api/schemas/tasks.py b/agentex/src/api/schemas/tasks.py index 9a8a53dd..bf6e52f6 100644 --- a/agentex/src/api/schemas/tasks.py +++ b/agentex/src/api/schemas/tasks.py @@ -84,3 +84,35 @@ class TaskStatusReasonRequest(BaseModel): None, title="Optional reason for the status change", ) + + +class SignalTaskRequest(BaseModel): + """Dispatch a Temporal signal to a running task's workflow. + + The workflow must register a matching `@workflow.signal(name=...)` + handler for the supplied ``signal_name``. ``payload`` is forwarded as + the signal's single argument; the workflow is responsible for any + shape validation. + + ``merge_params`` is an optional shallow-merge into the task's stored + ``params`` JSONB column. Used by live-config flows that want the + persisted task row to reflect the new config alongside the workflow + signal (e.g. ConfigModal Save). + """ + + signal_name: str = Field( + ..., + title="Name of the Temporal signal to dispatch", + ) + payload: dict[str, Any] | None = Field( + None, + title="Optional JSON payload forwarded to the workflow's signal handler", + ) + merge_params: dict[str, Any] | None = Field( + None, + title=( + "Optional shallow-merge patch applied to the task's params column " + "after the signal succeeds. Top-level keys overwrite; pass full " + "nested objects to change subfields." + ), + ) diff --git a/agentex/src/domain/repositories/task_repository.py b/agentex/src/domain/repositories/task_repository.py index 231c0e9c..96949bc3 100644 --- a/agentex/src/domain/repositories/task_repository.py +++ b/agentex/src/domain/repositories/task_repository.py @@ -3,7 +3,8 @@ from typing import Annotated, Literal from fastapi import Depends -from sqlalchemy import distinct, func, select, update +from sqlalchemy import cast, distinct, func, select, update +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import selectinload from src.adapters.crud_store.adapter_postgres import ( ColumnPrimitiveValue, @@ -213,6 +214,45 @@ async def update(self, task: TaskEntity) -> TaskEntity: # Return with agents populated return TaskEntity.model_validate(modified_orm) + async def merge_params(self, task_id: str, patch: dict) -> TaskEntity | None: + """Atomically shallow-merge ``patch`` into the task's ``params`` + JSONB column. Returns the updated entity, or ``None`` if no task + with that id exists. + + Uses Postgres' ``||`` operator on JSONB so the read-modify-write + is collapsed to a single statement (no race with concurrent + writers). ``patch`` keys overwrite existing keys at the top + level; nested objects are NOT deep-merged — pass the full nested + replacement if you need to change a subfield. + + Intended for surfaces that need ``tasks.params`` to reflect the + latest agent config after a live edit (see + ``TasksUseCase.signal_task``); not a general-purpose updater. + """ + + async with ( + self.start_async_db_session(True) as session, + async_sql_exception_handler(), + ): + # ``COALESCE(params, '{}'::jsonb)`` so a NULL existing value + # doesn't poison the concat to NULL. Both operands cast to + # JSONB explicitly so Postgres picks the JSONB ``||`` operator + # (not the text concat overload). + existing = func.coalesce(TaskORM.params, cast({}, JSONB)) + merged = existing.op("||", return_type=JSONB)(cast(patch, JSONB)) + stmt = ( + update(TaskORM) + .where(TaskORM.id == task_id) + .values(params=merged) + .returning(TaskORM) + ) + result = await session.execute(stmt) + row = result.scalar_one_or_none() + await session.commit() + if row is None: + return None + return TaskEntity.model_validate(row) + async def transition_status( self, task_id: str, diff --git a/agentex/src/domain/services/task_service.py b/agentex/src/domain/services/task_service.py index 809dd954..9cdee218 100644 --- a/agentex/src/domain/services/task_service.py +++ b/agentex/src/domain/services/task_service.py @@ -242,6 +242,16 @@ async def update_task(self, task: TaskEntity) -> TaskEntity: return updated_task + async def merge_task_params(self, task_id: str, patch: dict) -> TaskEntity | None: + """Atomically merge ``patch`` into ``tasks.params``. Returns the + updated entity, or ``None`` if no task with ``task_id`` exists. + + Used by live-config flows (e.g. ConfigModal Save → signal + + DB persistence) so the persisted task row reflects the new + agent config without waiting for the next task to be created. + """ + return await self.task_repository.merge_params(task_id, patch) + async def delete_task(self, id: str | None = None, name: str | None = None) -> None: """ Delete a task from the repository. diff --git a/agentex/src/domain/use_cases/tasks_use_case.py b/agentex/src/domain/use_cases/tasks_use_case.py index 4ad1a61a..f3e30816 100644 --- a/agentex/src/domain/use_cases/tasks_use_case.py +++ b/agentex/src/domain/use_cases/tasks_use_case.py @@ -3,6 +3,7 @@ from fastapi import Depends from src.adapters.crud_store.exceptions import ItemDoesNotExist +from src.adapters.temporal.adapter_temporal import DTemporalAdapter from src.domain.entities.tasks import TaskEntity, TaskRelationships, TaskStatus from src.domain.exceptions import ClientError from src.domain.services.task_service import DAgentTaskService @@ -19,8 +20,10 @@ class TasksUseCase: def __init__( self, task_service: DAgentTaskService, + temporal_adapter: DTemporalAdapter, ): self.task_service = task_service + self.temporal_adapter = temporal_adapter async def get_task( self, @@ -192,5 +195,48 @@ async def timeout_task( TaskStatus.TIMED_OUT, id=id, name=name, reason=reason ) + async def signal_task( + self, + id: str, + signal_name: str, + payload: dict[str, Any] | None = None, + merge_params: dict[str, Any] | None = None, + ) -> TaskEntity: + """Dispatch a Temporal signal to a running task's workflow. + + Validates that the task is currently RUNNING before signaling — + Temporal will raise its own ``WorkflowNotFoundError`` for closed + workflows, but returning a 4xx-mapped ``ClientError`` upfront is + friendlier than the bare-exception round-trip. The signal name + + payload are forwarded as-is; the workflow's registered + ``@workflow.signal`` handler owns shape validation. + + ``workflow_id`` matches the task id in this codebase (see + TasksService.create_task — Temporal workflows are started with + ``workflow_id=task.id``). + + Optional ``merge_params``: if supplied, shallow-merges into the + task's ``params`` JSONB column after the signal dispatch + succeeds. Used by live-config flows so the persisted task row + reflects the new agent config without waiting for the next task + to be created. Returns the task entity post-merge (or pre-merge + if no patch was given). + """ + task = await self.task_service.get_task(id=id) + if task.status != TaskStatus.RUNNING: + raise ClientError( + f"Task {id} is not running (status={task.status}); cannot signal." + ) + await self.temporal_adapter.signal_workflow( + workflow_id=id, + signal=signal_name, + arg=payload, + ) + if merge_params: + merged = await self.task_service.merge_task_params(id, merge_params) + if merged is not None: + return merged + return task + DTaskUseCase = Annotated[TasksUseCase, Depends(TasksUseCase)] From 2ee665ffdb7aa2d5a9fb3d84ae6872f33253ee00 Mon Sep 17 00:00:00 2001 From: Michael Chou Date: Thu, 18 Jun 2026 15:32:45 -0700 Subject: [PATCH 2/4] update task --- agentex/openapi.yaml | 9 ++++++++ agentex/src/api/routes/tasks.py | 4 +++- agentex/src/api/schemas/tasks.py | 9 ++++++++ .../src/domain/use_cases/tasks_use_case.py | 22 +++++++++++++++---- 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index 0ad7f0fb..c8d37299 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -6732,6 +6732,15 @@ components: type: object - type: 'null' title: If provided, replaces task_metadata with this value + merge_params: + anyOf: + - additionalProperties: true + type: object + - type: 'null' + title: Optional shallow-merge patch applied to the task's params column. + Top-level keys overwrite; pass full nested objects to change subfields. + The worker is expected to re-read task.params at turn boundaries to pick + up the new values. type: object title: UpdateTaskRequest ValidationError: diff --git a/agentex/src/api/routes/tasks.py b/agentex/src/api/routes/tasks.py index af196dec..87f5fbae 100644 --- a/agentex/src/api/routes/tasks.py +++ b/agentex/src/api/routes/tasks.py @@ -194,7 +194,9 @@ async def update_task( task_use_case: DTaskUseCase, ) -> Task: updated_task_entity = await task_use_case.update_mutable_fields_on_task( - id=task_id, task_metadata=request.task_metadata + id=task_id, + task_metadata=request.task_metadata, + merge_params=request.merge_params, ) return Task.model_validate(updated_task_entity) diff --git a/agentex/src/api/schemas/tasks.py b/agentex/src/api/schemas/tasks.py index bf6e52f6..f66f2569 100644 --- a/agentex/src/api/schemas/tasks.py +++ b/agentex/src/api/schemas/tasks.py @@ -77,6 +77,15 @@ class UpdateTaskRequest(BaseModel): None, title="If provided, replaces task_metadata with this value", ) + merge_params: dict[str, Any] | None = Field( + None, + title=( + "Optional shallow-merge patch applied to the task's params column. " + "Top-level keys overwrite; pass full nested objects to change " + "subfields. The worker is expected to re-read task.params at turn " + "boundaries to pick up the new values." + ), + ) class TaskStatusReasonRequest(BaseModel): diff --git a/agentex/src/domain/use_cases/tasks_use_case.py b/agentex/src/domain/use_cases/tasks_use_case.py index f3e30816..0eb13f53 100644 --- a/agentex/src/domain/use_cases/tasks_use_case.py +++ b/agentex/src/domain/use_cases/tasks_use_case.py @@ -97,6 +97,7 @@ async def update_mutable_fields_on_task( id: str | None = None, name: str | None = None, task_metadata: dict[str, Any] | None = None, + merge_params: dict[str, Any] | None = None, ) -> TaskEntity: """Update mutable fields on a task entity. This is used by our API since not all fields should be mutable.""" @@ -111,15 +112,28 @@ async def update_mutable_fields_on_task( else: raise ItemDoesNotExist(f"Task {name} not found") - # if no mutations are provided, don't do anything - if task_metadata is None: + # No-op if neither field was supplied. + if task_metadata is None and merge_params is None: return task_entity if task_metadata is not None: task_entity.task_metadata = task_metadata - updated_task_entity = await self.task_service.update_task(task=task_entity) - return updated_task_entity + # `merge_params` is a separate atomic JSONB shallow-merge so concurrent + # callers don't overwrite each other's fields (vs reading→mutating→writing + # the whole params dict on task_entity). Falls through to a normal save + # only when task_metadata also changed. + if merge_params: + merged = await self.task_service.merge_task_params( + task_entity.id, merge_params + ) + if merged is not None: + task_entity = merged + + if task_metadata is not None: + task_entity = await self.task_service.update_task(task=task_entity) + + return task_entity async def _transition_to_terminal( self, From 5dc4a24463f1ded20e03c934ee65804c752465b2 Mon Sep 17 00:00:00 2001 From: Michael Chou Date: Thu, 18 Jun 2026 15:53:35 -0700 Subject: [PATCH 3/4] remove signal --- agentex/openapi.yaml | 78 ------------------- agentex/src/api/routes/tasks.py | 27 ------- agentex/src/api/schemas/tasks.py | 32 -------- .../domain/repositories/task_repository.py | 3 +- .../src/domain/use_cases/tasks_use_case.py | 46 ----------- 5 files changed, 2 insertions(+), 184 deletions(-) diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index 449211ff..0716710e 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -824,42 +824,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /tasks/{task_id}/signal: - post: - tags: - - Tasks - summary: Signal Task - description: Dispatch a Temporal signal to the task's workflow. The workflow - must have a `@workflow.signal(name=...)` handler registered for the supplied - `signal_name`; the optional `payload` is forwarded as the signal's single - argument. Returns 4xx if the task isn't currently RUNNING. - operationId: signal_task_tasks__task_id__signal_post - parameters: - - name: task_id - in: path - required: true - schema: - type: string - title: Task Id - requestBody: - required: true - content: - application/json: - schema: - $ref: '#/components/schemas/SignalTaskRequest' - responses: - '200': - description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/Task' - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' /tasks/{task_id}/stream: get: tags: @@ -5680,48 +5644,6 @@ components: required: - content title: SendMessageRequest - SignalTaskRequest: - properties: - signal_name: - type: string - title: Name of the Temporal signal to dispatch - payload: - anyOf: - - additionalProperties: true - type: object - - type: 'null' - title: Optional JSON payload forwarded to the workflow's signal handler - merge_params: - anyOf: - - additionalProperties: true - type: object - - type: 'null' - title: Optional shallow-merge patch applied to the task's params column - after the signal succeeds. Top-level keys overwrite; pass full nested - objects to change subfields. - type: object - required: - - signal_name - title: SignalTaskRequest - description: 'Dispatch a Temporal signal to a running task''s workflow. - - - The workflow must register a matching `@workflow.signal(name=...)` - - handler for the supplied ``signal_name``. ``payload`` is forwarded as - - the signal''s single argument; the workflow is responsible for any - - shape validation. - - - ``merge_params`` is an optional shallow-merge into the task''s stored - - ``params`` JSONB column. Used by live-config flows that want the - - persisted task row to reflect the new config alongside the workflow - - signal (e.g. ConfigModal Save).' Span: properties: id: diff --git a/agentex/src/api/routes/tasks.py b/agentex/src/api/routes/tasks.py index 3a3aa9f8..a8ae9642 100644 --- a/agentex/src/api/routes/tasks.py +++ b/agentex/src/api/routes/tasks.py @@ -12,7 +12,6 @@ ) from src.api.schemas.delete_response import DeleteResponse from src.api.schemas.tasks import ( - SignalTaskRequest, Task, TaskRelationships, TaskResponse, @@ -308,32 +307,6 @@ async def timeout_task( return Task.model_validate(updated) -@router.post( - "/{task_id}/signal", - response_model=Task, - summary="Signal Task", - description=( - "Dispatch a Temporal signal to the task's workflow. The workflow " - "must have a `@workflow.signal(name=...)` handler registered for " - "the supplied `signal_name`; the optional `payload` is forwarded " - "as the signal's single argument. Returns 4xx if the task isn't " - "currently RUNNING." - ), -) -async def signal_task( - task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.update), - request: SignalTaskRequest, - task_use_case: DTaskUseCase, -) -> Task: - updated = await task_use_case.signal_task( - id=task_id, - signal_name=request.signal_name, - payload=request.payload, - merge_params=request.merge_params, - ) - return Task.model_validate(updated) - - @router.get( "/{task_id}/stream", summary="Stream Task Events by ID", diff --git a/agentex/src/api/schemas/tasks.py b/agentex/src/api/schemas/tasks.py index f66f2569..84ee031a 100644 --- a/agentex/src/api/schemas/tasks.py +++ b/agentex/src/api/schemas/tasks.py @@ -93,35 +93,3 @@ class TaskStatusReasonRequest(BaseModel): None, title="Optional reason for the status change", ) - - -class SignalTaskRequest(BaseModel): - """Dispatch a Temporal signal to a running task's workflow. - - The workflow must register a matching `@workflow.signal(name=...)` - handler for the supplied ``signal_name``. ``payload`` is forwarded as - the signal's single argument; the workflow is responsible for any - shape validation. - - ``merge_params`` is an optional shallow-merge into the task's stored - ``params`` JSONB column. Used by live-config flows that want the - persisted task row to reflect the new config alongside the workflow - signal (e.g. ConfigModal Save). - """ - - signal_name: str = Field( - ..., - title="Name of the Temporal signal to dispatch", - ) - payload: dict[str, Any] | None = Field( - None, - title="Optional JSON payload forwarded to the workflow's signal handler", - ) - merge_params: dict[str, Any] | None = Field( - None, - title=( - "Optional shallow-merge patch applied to the task's params column " - "after the signal succeeds. Top-level keys overwrite; pass full " - "nested objects to change subfields." - ), - ) diff --git a/agentex/src/domain/repositories/task_repository.py b/agentex/src/domain/repositories/task_repository.py index 96949bc3..45662015 100644 --- a/agentex/src/domain/repositories/task_repository.py +++ b/agentex/src/domain/repositories/task_repository.py @@ -227,7 +227,8 @@ async def merge_params(self, task_id: str, patch: dict) -> TaskEntity | None: Intended for surfaces that need ``tasks.params`` to reflect the latest agent config after a live edit (see - ``TasksUseCase.signal_task``); not a general-purpose updater. + ``TasksUseCase.update_mutable_fields_on_task``); not a + general-purpose updater. """ async with ( diff --git a/agentex/src/domain/use_cases/tasks_use_case.py b/agentex/src/domain/use_cases/tasks_use_case.py index 0eb13f53..dffc6ec3 100644 --- a/agentex/src/domain/use_cases/tasks_use_case.py +++ b/agentex/src/domain/use_cases/tasks_use_case.py @@ -3,7 +3,6 @@ from fastapi import Depends from src.adapters.crud_store.exceptions import ItemDoesNotExist -from src.adapters.temporal.adapter_temporal import DTemporalAdapter from src.domain.entities.tasks import TaskEntity, TaskRelationships, TaskStatus from src.domain.exceptions import ClientError from src.domain.services.task_service import DAgentTaskService @@ -20,10 +19,8 @@ class TasksUseCase: def __init__( self, task_service: DAgentTaskService, - temporal_adapter: DTemporalAdapter, ): self.task_service = task_service - self.temporal_adapter = temporal_adapter async def get_task( self, @@ -209,48 +206,5 @@ async def timeout_task( TaskStatus.TIMED_OUT, id=id, name=name, reason=reason ) - async def signal_task( - self, - id: str, - signal_name: str, - payload: dict[str, Any] | None = None, - merge_params: dict[str, Any] | None = None, - ) -> TaskEntity: - """Dispatch a Temporal signal to a running task's workflow. - - Validates that the task is currently RUNNING before signaling — - Temporal will raise its own ``WorkflowNotFoundError`` for closed - workflows, but returning a 4xx-mapped ``ClientError`` upfront is - friendlier than the bare-exception round-trip. The signal name + - payload are forwarded as-is; the workflow's registered - ``@workflow.signal`` handler owns shape validation. - - ``workflow_id`` matches the task id in this codebase (see - TasksService.create_task — Temporal workflows are started with - ``workflow_id=task.id``). - - Optional ``merge_params``: if supplied, shallow-merges into the - task's ``params`` JSONB column after the signal dispatch - succeeds. Used by live-config flows so the persisted task row - reflects the new agent config without waiting for the next task - to be created. Returns the task entity post-merge (or pre-merge - if no patch was given). - """ - task = await self.task_service.get_task(id=id) - if task.status != TaskStatus.RUNNING: - raise ClientError( - f"Task {id} is not running (status={task.status}); cannot signal." - ) - await self.temporal_adapter.signal_workflow( - workflow_id=id, - signal=signal_name, - arg=payload, - ) - if merge_params: - merged = await self.task_service.merge_task_params(id, merge_params) - if merged is not None: - return merged - return task - DTaskUseCase = Annotated[TasksUseCase, Depends(TasksUseCase)] From 4369c55ac1f53428561f71011cebd98a7e6b8264 Mon Sep 17 00:00:00 2001 From: Michael Chou Date: Thu, 18 Jun 2026 16:23:39 -0700 Subject: [PATCH 4/4] comments --- agentex/openapi.yaml | 2 -- agentex/src/api/schemas/tasks.py | 3 +-- agentex/src/domain/services/task_service.py | 9 ++++----- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index 0716710e..caa12c7f 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -6667,8 +6667,6 @@ components: - type: 'null' title: Optional shallow-merge patch applied to the task's params column. Top-level keys overwrite; pass full nested objects to change subfields. - The worker is expected to re-read task.params at turn boundaries to pick - up the new values. type: object title: UpdateTaskRequest ValidationError: diff --git a/agentex/src/api/schemas/tasks.py b/agentex/src/api/schemas/tasks.py index 84ee031a..74c8d83d 100644 --- a/agentex/src/api/schemas/tasks.py +++ b/agentex/src/api/schemas/tasks.py @@ -82,8 +82,7 @@ class UpdateTaskRequest(BaseModel): title=( "Optional shallow-merge patch applied to the task's params column. " "Top-level keys overwrite; pass full nested objects to change " - "subfields. The worker is expected to re-read task.params at turn " - "boundaries to pick up the new values." + "subfields." ), ) diff --git a/agentex/src/domain/services/task_service.py b/agentex/src/domain/services/task_service.py index 9cdee218..299494b8 100644 --- a/agentex/src/domain/services/task_service.py +++ b/agentex/src/domain/services/task_service.py @@ -243,12 +243,11 @@ async def update_task(self, task: TaskEntity) -> TaskEntity: return updated_task async def merge_task_params(self, task_id: str, patch: dict) -> TaskEntity | None: - """Atomically merge ``patch`` into ``tasks.params``. Returns the - updated entity, or ``None`` if no task with ``task_id`` exists. + """Atomically shallow-merge ``patch`` into ``tasks.params``. Returns + the updated entity, or ``None`` if no task with ``task_id`` exists. - Used by live-config flows (e.g. ConfigModal Save → signal + - DB persistence) so the persisted task row reflects the new - agent config without waiting for the next task to be created. + Lets callers update agent config on the task row in place; the agent + picks up the new values when it next reads ``task.params``. """ return await self.task_repository.merge_params(task_id, patch)