Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agentex/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6660,6 +6660,13 @@ components:
type: object
- type: 'null'
title: If provided, replaces task_metadata with this value
merge_params:
anyOf:
- additionalProperties: true
type: object
- type: 'null'
title: Optional shallow-merge patch applied to the task's params column.
Top-level keys overwrite; pass full nested objects to change subfields.
type: object
title: UpdateTaskRequest
ValidationError:
Expand Down
4 changes: 3 additions & 1 deletion agentex/src/api/routes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ async def update_task(
task_use_case: DTaskUseCase,
) -> Task:
updated_task_entity = await task_use_case.update_mutable_fields_on_task(
id=task_id, task_metadata=request.task_metadata
id=task_id,
task_metadata=request.task_metadata,
merge_params=request.merge_params,
)
return Task.model_validate(updated_task_entity)

Expand Down
8 changes: 8 additions & 0 deletions agentex/src/api/schemas/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ class UpdateTaskRequest(BaseModel):
None,
title="If provided, replaces task_metadata with this value",
)
merge_params: dict[str, Any] | None = Field(
None,
title=(
"Optional shallow-merge patch applied to the task's params column. "
"Top-level keys overwrite; pass full nested objects to change "
"subfields."
),
)


class TaskStatusReasonRequest(BaseModel):
Expand Down
43 changes: 42 additions & 1 deletion agentex/src/domain/repositories/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import Annotated, Literal

from fastapi import Depends
from sqlalchemy import distinct, func, select, update
from sqlalchemy import cast, distinct, func, select, update
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import selectinload
from src.adapters.crud_store.adapter_postgres import (
ColumnPrimitiveValue,
Expand Down Expand Up @@ -213,6 +214,46 @@ async def update(self, task: TaskEntity) -> TaskEntity:
# Return with agents populated
return TaskEntity.model_validate(modified_orm)

async def merge_params(self, task_id: str, patch: dict) -> TaskEntity | None:
"""Atomically shallow-merge ``patch`` into the task's ``params``
JSONB column. Returns the updated entity, or ``None`` if no task
with that id exists.

Uses Postgres' ``||`` operator on JSONB so the read-modify-write
is collapsed to a single statement (no race with concurrent
writers). ``patch`` keys overwrite existing keys at the top
level; nested objects are NOT deep-merged — pass the full nested
replacement if you need to change a subfield.

Intended for surfaces that need ``tasks.params`` to reflect the
latest agent config after a live edit (see
``TasksUseCase.update_mutable_fields_on_task``); not a
general-purpose updater.
"""

async with (
self.start_async_db_session(True) as session,
async_sql_exception_handler(),
):
# ``COALESCE(params, '{}'::jsonb)`` so a NULL existing value
# doesn't poison the concat to NULL. Both operands cast to
# JSONB explicitly so Postgres picks the JSONB ``||`` operator
# (not the text concat overload).
existing = func.coalesce(TaskORM.params, cast({}, JSONB))
merged = existing.op("||", return_type=JSONB)(cast(patch, JSONB))
stmt = (
update(TaskORM)
.where(TaskORM.id == task_id)
.values(params=merged)
.returning(TaskORM)
)
result = await session.execute(stmt)
row = result.scalar_one_or_none()
await session.commit()
if row is None:
return None
return TaskEntity.model_validate(row)

async def transition_status(
self,
task_id: str,
Expand Down
9 changes: 9 additions & 0 deletions agentex/src/domain/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ async def update_task(self, task: TaskEntity) -> TaskEntity:

return updated_task

async def merge_task_params(self, task_id: str, patch: dict) -> TaskEntity | None:
"""Atomically shallow-merge ``patch`` into ``tasks.params``. Returns
the updated entity, or ``None`` if no task with ``task_id`` exists.

Lets callers update agent config on the task row in place; the agent
picks up the new values when it next reads ``task.params``.
"""
return await self.task_repository.merge_params(task_id, patch)
Comment on lines +245 to +252

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 merge_task_params skips the task_updated stream event

update_task always publishes a task_updated event to the task event stream after writing to the DB. merge_task_params only calls task_repository.merge_params and returns, with no equivalent stream publish. When a caller patches only merge_params (no task_metadata), the if task_metadata is not None branch in the use case is skipped, so update_task is never reached and no event is ever emitted. Any consumer watching the stream — including the Temporal worker the PR description says must receive a signal — will not be notified of the config change.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/services/task_service.py
Line: 245-253

Comment:
**`merge_task_params` skips the `task_updated` stream event**

`update_task` always publishes a `task_updated` event to the task event stream after writing to the DB. `merge_task_params` only calls `task_repository.merge_params` and returns, with no equivalent stream publish. When a caller patches only `merge_params` (no `task_metadata`), the `if task_metadata is not None` branch in the use case is skipped, so `update_task` is never reached and no event is ever emitted. Any consumer watching the stream — including the Temporal worker the PR description says must receive a signal — will not be notified of the config change.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex


async def delete_task(self, id: str | None = None, name: str | None = None) -> None:
"""
Delete a task from the repository.
Expand Down
22 changes: 18 additions & 4 deletions agentex/src/domain/use_cases/tasks_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async def update_mutable_fields_on_task(
id: str | None = None,
name: str | None = None,
task_metadata: dict[str, Any] | None = None,
merge_params: dict[str, Any] | None = None,
) -> TaskEntity:
"""Update mutable fields on a task entity. This is used by our API since not all fields should be mutable."""

Expand All @@ -108,15 +109,28 @@ async def update_mutable_fields_on_task(
else:
raise ItemDoesNotExist(f"Task {name} not found")

# if no mutations are provided, don't do anything
if task_metadata is None:
# No-op if neither field was supplied.
if task_metadata is None and merge_params is None:
return task_entity

if task_metadata is not None:
task_entity.task_metadata = task_metadata

updated_task_entity = await self.task_service.update_task(task=task_entity)
return updated_task_entity
# `merge_params` is a separate atomic JSONB shallow-merge so concurrent
# callers don't overwrite each other's fields (vs reading→mutating→writing
# the whole params dict on task_entity). Falls through to a normal save
# only when task_metadata also changed.
if merge_params:
merged = await self.task_service.merge_task_params(
task_entity.id, merge_params
)
if merged is not None:
task_entity = merged

if task_metadata is not None:
task_entity = await self.task_service.update_task(task=task_entity)
Comment thread
greptile-apps[bot] marked this conversation as resolved.

return task_entity

async def _transition_to_terminal(
self,
Expand Down
Loading