From f9e2f2c59ce8461b019e1a577236af279342ca92 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Sat, 27 Jun 2026 22:26:30 +0300 Subject: [PATCH 1/2] docs(langchain): add time trigger interrupts --- docs/human_in_the_loop.md | 27 ++++++++ tests/hitl/mocks/time_trigger_hitl.py | 33 ++++++++++ tests/runtime/test_resumable.py | 94 +++++++++++++++++++++++++++ 3 files changed, 154 insertions(+) create mode 100644 tests/hitl/mocks/time_trigger_hitl.py diff --git a/docs/human_in_the_loop.md b/docs/human_in_the_loop.md index 678b4530e..822c3735b 100644 --- a/docs/human_in_the_loop.md +++ b/docs/human_in_the_loop.md @@ -468,6 +468,33 @@ For a practical implementation, refer to the [email-triage-agent sample](https:/ --- +## Time triggers + +### WaitTimeTrigger + +Suspends the agent until an Orchestrator time trigger fires. The SDK stores the schedule as part of the resume condition, and Orchestrator resumes the suspended job when the configured time trigger fires. + +| Attribute | Type | Description | +| --- | --- | --- | +| `cron_expression` | `str` | Quartz cron expression for the schedule. Use the same six-field shape accepted by `uip or triggers create --type time --cron`, for example `0 0 9 ? * MON-FRI`. | +| `time_zone_id` | `str` | IANA timezone used to evaluate the cron expression, for example `Europe/Bucharest`. Defaults to `UTC`. | + +```python +from langgraph.types import interrupt +from uipath.platform.common import WaitTimeTrigger + +resume_payload = interrupt( + WaitTimeTrigger( + cron_expression="0 0/5 * * * ?", + time_zone_id="Europe/Bucharest", + ) +) +``` + +This is intended for deployed jobs where Orchestrator owns the scheduler. In a local run there is no scheduler to fire the time trigger, so resume the interrupt explicitly when testing locally. + +--- + ## Resuming with a plain value (API trigger) All the models above are typed interrupts that tie the agent's wait state to a specific UiPath operation. When you call `interrupt(...)` with a value that is **not** one of those models, most commonly a plain string but any JSON-serializable value works, the SDK creates an **API resume trigger**. The agent suspends and waits to be resumed by an explicit external API call rather than by polling a UiPath operation. diff --git a/tests/hitl/mocks/time_trigger_hitl.py b/tests/hitl/mocks/time_trigger_hitl.py new file mode 100644 index 000000000..4fb925ad9 --- /dev/null +++ b/tests/hitl/mocks/time_trigger_hitl.py @@ -0,0 +1,33 @@ +from typing import TypedDict + +from langgraph.checkpoint.memory import MemorySaver +from langgraph.graph import END, START, StateGraph +from langgraph.types import interrupt +from uipath.platform.common import WaitTimeTrigger + + +class State(TypedDict): + message: str + + +def main_node(state: State) -> State: + response = interrupt( + WaitTimeTrigger( + cron_expression="0 0/5 * * * ?", + time_zone_id="Europe/Bucharest", + ) + ) + return {"message": str(response)} + + +builder: StateGraph[State] = StateGraph(State) + +builder.add_node("main_node", main_node) + +builder.add_edge(START, "main_node") +builder.add_edge("main_node", END) + + +memory = MemorySaver() + +graph = builder.compile(checkpointer=memory) diff --git a/tests/runtime/test_resumable.py b/tests/runtime/test_resumable.py index 4c0e8a036..4edb7490b 100644 --- a/tests/runtime/test_resumable.py +++ b/tests/runtime/test_resumable.py @@ -6,6 +6,7 @@ from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.graph import END, START, StateGraph from langgraph.types import interrupt +from pydantic import BaseModel, ConfigDict, Field from uipath.core.errors import ErrorCategory, UiPathPendingTriggerError from uipath.core.triggers import ( UiPathResumeTrigger, @@ -86,6 +87,99 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any: return f"Response for {branch_name}" +class TimerLikeInterrupt(BaseModel): + """Timer-shaped model used to verify typed interrupt pass-through.""" + + cron_expression: str = Field(alias="cronExpression") + time_zone_id: str = Field(default="UTC", alias="timeZoneId") + + model_config = ConfigDict(validate_by_name=True) + + +class CapturingTimerTriggerHandler: + """Mock implementation that captures typed time trigger interrupt values.""" + + def __init__(self): + self.suspend_values: list[TimerLikeInterrupt] = [] + + async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: + """Create a timer trigger from the typed suspend value.""" + assert isinstance(suspend_value, TimerLikeInterrupt) + self.suspend_values.append(suspend_value) + return UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.TIMER, + trigger_name=UiPathResumeTriggerName.TIMER, + payload=suspend_value.model_dump(by_alias=True), + ) + + async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any: + """Timer triggers are resumed externally and remain pending locally.""" + raise UiPathPendingTriggerError( + ErrorCategory.SYSTEM, "Time trigger is still pending" + ) + + +@pytest.mark.asyncio +async def test_typed_time_interrupt_is_forwarded_to_trigger_handler(): + """LangGraph interrupt values are passed through for timer-shaped models.""" + + class State(TypedDict, total=False): + message: str | None + + def wait_for_time(state: State) -> State: + result = interrupt( + TimerLikeInterrupt( + cron_expression="0 0/5 * * * ?", + time_zone_id="Europe/Bucharest", + ) + ) + return {"message": f"resumed with: {result}"} + + graph = StateGraph(State) + graph.add_node("wait_for_time", wait_for_time) + graph.add_edge(START, "wait_for_time") + graph.add_edge("wait_for_time", END) + + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with AsyncSqliteSaver.from_conn_string(temp_db.name) as memory: + compiled_graph = graph.compile(checkpointer=memory) + base_runtime = UiPathLangGraphRuntime( + graph=compiled_graph, + runtime_id="time-trigger-pass-through-test", + entrypoint="test", + ) + storage = SqliteResumableStorage(memory) + trigger_handler = CapturingTimerTriggerHandler() + runtime = UiPathResumableRuntime( + delegate=base_runtime, + storage=storage, + trigger_manager=trigger_handler, + runtime_id="time-trigger-pass-through-test", + ) + + result = await runtime.execute( + input={"message": None}, + options=UiPathExecuteOptions(resume=False), + ) + + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert len(trigger_handler.suspend_values) == 1 + suspend_value = trigger_handler.suspend_values[0] + assert suspend_value.cron_expression == "0 0/5 * * * ?" + assert suspend_value.time_zone_id == "Europe/Bucharest" + assert result.triggers is not None + assert result.triggers[0].trigger_type == UiPathResumeTriggerType.TIMER + assert result.triggers[0].payload == { + "cronExpression": "0 0/5 * * * ?", + "timeZoneId": "Europe/Bucharest", + } + finally: + os.unlink(temp_db.name) + + @pytest.mark.asyncio async def test_parallel_branches_with_sequential_trigger_resolution(): """Test graph execution with parallel branches where triggers resolve sequentially.""" From 10725f373860d1c584bf44d4d9c9e7593f02dd2f Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Sat, 27 Jun 2026 23:16:52 +0300 Subject: [PATCH 2/2] docs: use resume time for timer trigger examples --- docs/human_in_the_loop.md | 9 ++++----- tests/hitl/mocks/time_trigger_hitl.py | 4 ++-- tests/runtime/test_resumable.py | 17 +++++++++-------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/human_in_the_loop.md b/docs/human_in_the_loop.md index 822c3735b..5bd35ec79 100644 --- a/docs/human_in_the_loop.md +++ b/docs/human_in_the_loop.md @@ -472,21 +472,20 @@ For a practical implementation, refer to the [email-triage-agent sample](https:/ ### WaitTimeTrigger -Suspends the agent until an Orchestrator time trigger fires. The SDK stores the schedule as part of the resume condition, and Orchestrator resumes the suspended job when the configured time trigger fires. +Suspends the agent until an Orchestrator timer fires. The SDK stores the resume timestamp as part of the resume condition, and Orchestrator resumes the suspended job when that time is reached. | Attribute | Type | Description | | --- | --- | --- | -| `cron_expression` | `str` | Quartz cron expression for the schedule. Use the same six-field shape accepted by `uip or triggers create --type time --cron`, for example `0 0 9 ? * MON-FRI`. | -| `time_zone_id` | `str` | IANA timezone used to evaluate the cron expression, for example `Europe/Bucharest`. Defaults to `UTC`. | +| `resume_time` | `datetime \| str` | Absolute time when Orchestrator should resume the suspended job. Use a timezone-aware UTC `datetime` or an ISO 8601 timestamp. | ```python from langgraph.types import interrupt from uipath.platform.common import WaitTimeTrigger +from datetime import datetime, timedelta, timezone resume_payload = interrupt( WaitTimeTrigger( - cron_expression="0 0/5 * * * ?", - time_zone_id="Europe/Bucharest", + resume_time=datetime.now(timezone.utc) + timedelta(minutes=5), ) ) ``` diff --git a/tests/hitl/mocks/time_trigger_hitl.py b/tests/hitl/mocks/time_trigger_hitl.py index 4fb925ad9..1267fca54 100644 --- a/tests/hitl/mocks/time_trigger_hitl.py +++ b/tests/hitl/mocks/time_trigger_hitl.py @@ -1,3 +1,4 @@ +from datetime import datetime, timedelta, timezone from typing import TypedDict from langgraph.checkpoint.memory import MemorySaver @@ -13,8 +14,7 @@ class State(TypedDict): def main_node(state: State) -> State: response = interrupt( WaitTimeTrigger( - cron_expression="0 0/5 * * * ?", - time_zone_id="Europe/Bucharest", + resume_time=datetime.now(timezone.utc) + timedelta(minutes=1), ) ) return {"message": str(response)} diff --git a/tests/runtime/test_resumable.py b/tests/runtime/test_resumable.py index 4edb7490b..71105846e 100644 --- a/tests/runtime/test_resumable.py +++ b/tests/runtime/test_resumable.py @@ -1,5 +1,6 @@ import os import tempfile +from datetime import datetime, timezone from typing import Any, TypedDict import pytest @@ -90,8 +91,7 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any: class TimerLikeInterrupt(BaseModel): """Timer-shaped model used to verify typed interrupt pass-through.""" - cron_expression: str = Field(alias="cronExpression") - time_zone_id: str = Field(default="UTC", alias="timeZoneId") + resume_time: datetime | str = Field(alias="resumeTime") model_config = ConfigDict(validate_by_name=True) @@ -129,8 +129,7 @@ class State(TypedDict, total=False): def wait_for_time(state: State) -> State: result = interrupt( TimerLikeInterrupt( - cron_expression="0 0/5 * * * ?", - time_zone_id="Europe/Bucharest", + resume_time=datetime(2026, 6, 27, 20, 14, 49, tzinfo=timezone.utc), ) ) return {"message": f"resumed with: {result}"} @@ -168,13 +167,15 @@ def wait_for_time(state: State) -> State: assert result.status == UiPathRuntimeStatus.SUSPENDED assert len(trigger_handler.suspend_values) == 1 suspend_value = trigger_handler.suspend_values[0] - assert suspend_value.cron_expression == "0 0/5 * * * ?" - assert suspend_value.time_zone_id == "Europe/Bucharest" + assert suspend_value.resume_time == datetime( + 2026, 6, 27, 20, 14, 49, tzinfo=timezone.utc + ) assert result.triggers is not None assert result.triggers[0].trigger_type == UiPathResumeTriggerType.TIMER assert result.triggers[0].payload == { - "cronExpression": "0 0/5 * * * ?", - "timeZoneId": "Europe/Bucharest", + "resumeTime": datetime( + 2026, 6, 27, 20, 14, 49, tzinfo=timezone.utc + ), } finally: os.unlink(temp_db.name)