From b19f291f7772a0673fee7fab8d8c33af6e108812 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Sun, 28 Jun 2026 00:46:17 +0300 Subject: [PATCH 1/3] docs: document wait until and interrupt timeouts --- docs/human_in_the_loop.md | 51 +++++++++++ samples/README.md | 6 ++ .../invoke-process-timeout-agent/README.md | 7 ++ .../bindings.json | 26 ++++++ samples/invoke-process-timeout-agent/graph.py | 43 +++++++++ .../langgraph.json | 7 ++ .../pyproject.toml | 15 ++++ .../invoke-process-timeout-agent/uipath.json | 14 +++ samples/wait-until-agent/README.md | 7 ++ samples/wait-until-agent/bindings.json | 4 + samples/wait-until-agent/graph.py | 31 +++++++ samples/wait-until-agent/langgraph.json | 7 ++ samples/wait-until-agent/pyproject.toml | 15 ++++ samples/wait-until-agent/uipath.json | 14 +++ tests/hitl/mocks/time_trigger_hitl.py | 33 +++++++ tests/runtime/test_resumable.py | 90 +++++++++++++++++++ 16 files changed, 370 insertions(+) create mode 100644 samples/invoke-process-timeout-agent/README.md create mode 100644 samples/invoke-process-timeout-agent/bindings.json create mode 100644 samples/invoke-process-timeout-agent/graph.py create mode 100644 samples/invoke-process-timeout-agent/langgraph.json create mode 100644 samples/invoke-process-timeout-agent/pyproject.toml create mode 100644 samples/invoke-process-timeout-agent/uipath.json create mode 100644 samples/wait-until-agent/README.md create mode 100644 samples/wait-until-agent/bindings.json create mode 100644 samples/wait-until-agent/graph.py create mode 100644 samples/wait-until-agent/langgraph.json create mode 100644 samples/wait-until-agent/pyproject.toml create mode 100644 samples/wait-until-agent/uipath.json create mode 100644 tests/hitl/mocks/time_trigger_hitl.py diff --git a/docs/human_in_the_loop.md b/docs/human_in_the_loop.md index 678b4530e..ac12d09f4 100644 --- a/docs/human_in_the_loop.md +++ b/docs/human_in_the_loop.md @@ -468,6 +468,57 @@ For a practical implementation, refer to the [email-triage-agent sample](https:/ --- +## Time triggers + +### WaitUntil + +Suspends the agent until an Orchestrator timer fires. The SDK stores the resume timestamp as part of the resume condition, and Orchestrator resumes the suspended job when that time is reached. + +| Attribute | Type | Description | +| --- | --- | --- | +| `resume_time` | `datetime` | Absolute time when Orchestrator should resume the suspended job. Use a timezone-aware `datetime`; it is normalized to UTC before the trigger is created. | + +```python +from langgraph.types import interrupt +from uipath.platform.common import WaitUntil +from datetime import datetime, timedelta, timezone + +resume_payload = interrupt( + WaitUntil( + resume_time=datetime.now(timezone.utc) + timedelta(minutes=5), + ) +) +``` + +This is intended for deployed jobs where Orchestrator owns the scheduler. In a local run there is no scheduler to fire the time trigger, so resume the interrupt explicitly when testing locally. + +--- + +## Interrupt timeouts + +Typed interrupt models can also be given a timeout. The SDK creates both the requested resume trigger and a timer resume trigger; whichever fires first resumes the agent. Timeout resume values use reserved UiPath metadata so they cannot collide with user payload fields. + +```python +from langgraph.types import interrupt +from uipath.platform.common import InvokeProcess +from uipath.platform.resume_triggers import assert_no_timeout, is_timeout + +result = interrupt( + InvokeProcess( + name="long-running-agent", + input_arguments={"message": "start"}, + timeout=10, + ) +) + +if is_timeout(result): + result = retry_or_fallback() + +result = assert_no_timeout(result) +``` + +--- + ## Resuming with a plain value (API trigger) All the models above are typed interrupts that tie the agent's wait state to a specific UiPath operation. When you call `interrupt(...)` with a value that is **not** one of those models, most commonly a plain string but any JSON-serializable value works, the SDK creates an **API resume trigger**. The agent suspends and waits to be resumed by an explicit external API call rather than by polling a UiPath operation. diff --git a/samples/README.md b/samples/README.md index 8a3017f32..728504d94 100644 --- a/samples/README.md +++ b/samples/README.md @@ -15,6 +15,12 @@ This sample shows how to automate Outlook inbox organization with AI-powered rul ## [HITL inbox server](hitl-inbox-server) This sample demonstrates a FastAPI server for managing human-in-the-loop workflows with job submissions and inbox message approvals. +## [Wait until agent](wait-until-agent) +This sample demonstrates a LangGraph agent that suspends with `WaitUntil` and resumes when Orchestrator fires a timer resume trigger. + +## [Invoke process timeout agent](invoke-process-timeout-agent) +This sample demonstrates `InvokeProcess(..., timeout=...)` and handling timeout resume values with the timeout helpers. + ## [Multi agent supervisor, researcher, coder](multi-agent-supervisor-researcher-coder) This sample showcases a multi-agent system, involving a supervisor, a researcher, and a coder working in coordination to tackle complex tasks. diff --git a/samples/invoke-process-timeout-agent/README.md b/samples/invoke-process-timeout-agent/README.md new file mode 100644 index 000000000..75fff031a --- /dev/null +++ b/samples/invoke-process-timeout-agent/README.md @@ -0,0 +1,7 @@ +# Invoke Process Timeout Agent + +This sample demonstrates timeout handling for typed interrupts. + +The parent graph invokes `CHILD_PROCESS_NAME = "timeout-child-agent"` with `InvokeProcess(..., timeout=600)`. If that process does not complete before the timeout, Orchestrator resumes the parent through the timer trigger first. The parent uses `assert_no_timeout`, which raises `UiPathTimeoutError` on timeout. + +The child process is declared in `bindings.json`. The sample code passes `process_folder_path="Shared"` when invoking the child process so the process lookup is explicit and can be changed with the binding to any process in the target organization. diff --git a/samples/invoke-process-timeout-agent/bindings.json b/samples/invoke-process-timeout-agent/bindings.json new file mode 100644 index 000000000..8c959eb45 --- /dev/null +++ b/samples/invoke-process-timeout-agent/bindings.json @@ -0,0 +1,26 @@ +{ + "version": "2.0", + "resources": [ + { + "resource": "process", + "key": "timeout-child-agent.Shared", + "value": { + "name": { + "defaultValue": "timeout-child-agent", + "isExpression": false, + "displayName": "Process Name" + }, + "folderPath": { + "defaultValue": "Shared", + "isExpression": false, + "displayName": "Process Folder Path" + } + }, + "metadata": { + "ActivityName": "InvokeProcess", + "BindingsVersion": "2.2", + "DisplayLabel": "timeout-child-agent" + } + } + ] +} diff --git a/samples/invoke-process-timeout-agent/graph.py b/samples/invoke-process-timeout-agent/graph.py new file mode 100644 index 000000000..6d7cae254 --- /dev/null +++ b/samples/invoke-process-timeout-agent/graph.py @@ -0,0 +1,43 @@ +from typing import TypedDict + +from langgraph.graph import END, START, StateGraph +from langgraph.types import interrupt +from uipath.platform.common import InvokeProcess +from uipath.platform.resume_triggers import assert_no_timeout + +CHILD_PROCESS_NAME = "timeout-child-agent" +PROCESS_FOLDER_PATH = "Shared" + + +class ParentState(TypedDict, total=False): + message: str + status: str + child_result: str + timeout: dict[str, object] + + +def parent_node(state: ParentState) -> ParentState: + child_result = interrupt( + InvokeProcess( + name=CHILD_PROCESS_NAME, + process_folder_path=PROCESS_FOLDER_PATH, + input_arguments={"message": state.get("message", "start child work")}, + # allows up to 10 minutes for the child process to finish. + timeout=600, + ) + ) + + # Raises UiPathTimeoutError on timeout. + assert_no_timeout(child_result) + + return { + "status": "completed", + "child_result": str(child_result), + } + + +parent_builder = StateGraph(ParentState) +parent_builder.add_node("parent", parent_node) +parent_builder.add_edge(START, "parent") +parent_builder.add_edge("parent", END) +parent_graph = parent_builder.compile() diff --git a/samples/invoke-process-timeout-agent/langgraph.json b/samples/invoke-process-timeout-agent/langgraph.json new file mode 100644 index 000000000..c12bdf184 --- /dev/null +++ b/samples/invoke-process-timeout-agent/langgraph.json @@ -0,0 +1,7 @@ +{ + "dependencies": ["."], + "graphs": { + "timeout-parent-agent": "./graph.py:parent_graph" + }, + "env": ".env" +} diff --git a/samples/invoke-process-timeout-agent/pyproject.toml b/samples/invoke-process-timeout-agent/pyproject.toml new file mode 100644 index 000000000..9e68bfb40 --- /dev/null +++ b/samples/invoke-process-timeout-agent/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "invoke-process-timeout-agent" +version = "0.0.1" +description = "LangGraph sample showing InvokeProcess timeout handling" +authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] +requires-python = ">=3.11" +dependencies = [ + "uipath", + "uipath-langchain", +] + +[dependency-groups] +dev = [ + "uipath-dev", +] diff --git a/samples/invoke-process-timeout-agent/uipath.json b/samples/invoke-process-timeout-agent/uipath.json new file mode 100644 index 000000000..52263d59a --- /dev/null +++ b/samples/invoke-process-timeout-agent/uipath.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://cloud.uipath.com/draft/2024-12/uipath", + "runtimeOptions": { + "isConversational": false + }, + "packOptions": { + "fileExtensionsIncluded": [], + "filesIncluded": [], + "filesExcluded": [], + "directoriesExcluded": [], + "includeUvLock": true + }, + "functions": {} +} diff --git a/samples/wait-until-agent/README.md b/samples/wait-until-agent/README.md new file mode 100644 index 000000000..5ca6e0d76 --- /dev/null +++ b/samples/wait-until-agent/README.md @@ -0,0 +1,7 @@ +# Wait Until Agent + +This sample demonstrates a LangGraph agent that suspends with `WaitUntil` and resumes when Orchestrator fires the timer resume trigger. + +The resume time is an absolute timezone-aware `datetime`. The SDK normalizes it to UTC before creating the timer trigger. + +The sample includes an empty `bindings.json` file so the project has the same deployable shape as samples that declare resource bindings. diff --git a/samples/wait-until-agent/bindings.json b/samples/wait-until-agent/bindings.json new file mode 100644 index 000000000..5e9beeb01 --- /dev/null +++ b/samples/wait-until-agent/bindings.json @@ -0,0 +1,4 @@ +{ + "version": "2.0", + "resources": [] +} diff --git a/samples/wait-until-agent/graph.py b/samples/wait-until-agent/graph.py new file mode 100644 index 000000000..5ce69ae39 --- /dev/null +++ b/samples/wait-until-agent/graph.py @@ -0,0 +1,31 @@ +from datetime import datetime, timedelta, timezone +from typing import TypedDict + +from langgraph.graph import END, START, StateGraph +from langgraph.types import interrupt +from uipath.platform.common import WaitUntil + + +class State(TypedDict, total=False): + message: str + resumed_at: str + resume_payload: str + + +def wait_until_node(state: State) -> State: + resume_time = datetime.now(timezone.utc) + timedelta(minutes=5) + resume_payload = interrupt(WaitUntil(resume_time=resume_time)) + + return { + "message": state.get("message", "wait completed"), + "resumed_at": datetime.now(timezone.utc).isoformat(), + "resume_payload": str(resume_payload), + } + + +builder = StateGraph(State) +builder.add_node("wait_until", wait_until_node) +builder.add_edge(START, "wait_until") +builder.add_edge("wait_until", END) + +graph = builder.compile() diff --git a/samples/wait-until-agent/langgraph.json b/samples/wait-until-agent/langgraph.json new file mode 100644 index 000000000..44a30fb6d --- /dev/null +++ b/samples/wait-until-agent/langgraph.json @@ -0,0 +1,7 @@ +{ + "dependencies": ["."], + "graphs": { + "wait-until-agent": "./graph.py:graph" + }, + "env": ".env" +} diff --git a/samples/wait-until-agent/pyproject.toml b/samples/wait-until-agent/pyproject.toml new file mode 100644 index 000000000..45029f2c8 --- /dev/null +++ b/samples/wait-until-agent/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "wait-until-agent" +version = "0.0.1" +description = "LangGraph sample showing WaitUntil timer-based resume triggers" +authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] +requires-python = ">=3.11" +dependencies = [ + "uipath", + "uipath-langchain", +] + +[dependency-groups] +dev = [ + "uipath-dev", +] diff --git a/samples/wait-until-agent/uipath.json b/samples/wait-until-agent/uipath.json new file mode 100644 index 000000000..52263d59a --- /dev/null +++ b/samples/wait-until-agent/uipath.json @@ -0,0 +1,14 @@ +{ + "$schema": "https://cloud.uipath.com/draft/2024-12/uipath", + "runtimeOptions": { + "isConversational": false + }, + "packOptions": { + "fileExtensionsIncluded": [], + "filesIncluded": [], + "filesExcluded": [], + "directoriesExcluded": [], + "includeUvLock": true + }, + "functions": {} +} diff --git a/tests/hitl/mocks/time_trigger_hitl.py b/tests/hitl/mocks/time_trigger_hitl.py new file mode 100644 index 000000000..6e5f638d6 --- /dev/null +++ b/tests/hitl/mocks/time_trigger_hitl.py @@ -0,0 +1,33 @@ +from datetime import datetime, timedelta, timezone +from typing import TypedDict + +from langgraph.checkpoint.memory import MemorySaver +from langgraph.graph import END, START, StateGraph +from langgraph.types import interrupt +from uipath.platform.common import WaitUntil + + +class State(TypedDict): + message: str + + +def main_node(state: State) -> State: + response = interrupt( + WaitUntil( + resume_time=datetime.now(timezone.utc) + timedelta(minutes=1), + ) + ) + return {"message": str(response)} + + +builder: StateGraph[State] = StateGraph(State) + +builder.add_node("main_node", main_node) + +builder.add_edge(START, "main_node") +builder.add_edge("main_node", END) + + +memory = MemorySaver() + +graph = builder.compile(checkpointer=memory) diff --git a/tests/runtime/test_resumable.py b/tests/runtime/test_resumable.py index 4c0e8a036..1eef90e56 100644 --- a/tests/runtime/test_resumable.py +++ b/tests/runtime/test_resumable.py @@ -1,11 +1,13 @@ import os import tempfile +from datetime import datetime, timedelta, timezone from typing import Any, TypedDict import pytest from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.graph import END, START, StateGraph from langgraph.types import interrupt +from pydantic import BaseModel, ConfigDict, Field from uipath.core.errors import ErrorCategory, UiPathPendingTriggerError from uipath.core.triggers import ( UiPathResumeTrigger, @@ -86,6 +88,94 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any: return f"Response for {branch_name}" +class WaitUntilLikeInterrupt(BaseModel): + """Timer-shaped model used to verify typed interrupt pass-through.""" + + resume_time: datetime = Field(alias="resumeTime") + + model_config = ConfigDict(validate_by_name=True) + + +class CapturingTimerTriggerHandler: + """Mock implementation that captures typed time trigger interrupt values.""" + + def __init__(self): + self.suspend_values: list[WaitUntilLikeInterrupt] = [] + + async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: + """Create a timer trigger from the typed suspend value.""" + assert isinstance(suspend_value, WaitUntilLikeInterrupt) + self.suspend_values.append(suspend_value) + return UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.TIMER, + trigger_name=UiPathResumeTriggerName.TIMER, + payload=suspend_value.model_dump(by_alias=True), + ) + + async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any: + """Timer triggers are resumed externally and remain pending locally.""" + raise UiPathPendingTriggerError( + ErrorCategory.SYSTEM, "Time trigger is still pending" + ) + + +@pytest.mark.asyncio +async def test_typed_time_interrupt_is_forwarded_to_trigger_handler(): + """LangGraph interrupt values are passed through for timer-shaped models.""" + resume_at = datetime.now(timezone.utc) + timedelta(minutes=5) + + class State(TypedDict, total=False): + message: str | None + + def wait_for_time(state: State) -> State: + result = interrupt( + WaitUntilLikeInterrupt( + resume_time=resume_at, + ) + ) + return {"message": f"resumed with: {result}"} + + graph = StateGraph(State) + graph.add_node("wait_for_time", wait_for_time) + graph.add_edge(START, "wait_for_time") + graph.add_edge("wait_for_time", END) + + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with AsyncSqliteSaver.from_conn_string(temp_db.name) as memory: + compiled_graph = graph.compile(checkpointer=memory) + base_runtime = UiPathLangGraphRuntime( + graph=compiled_graph, + runtime_id="time-trigger-pass-through-test", + entrypoint="test", + ) + storage = SqliteResumableStorage(memory) + trigger_handler = CapturingTimerTriggerHandler() + runtime = UiPathResumableRuntime( + delegate=base_runtime, + storage=storage, + trigger_manager=trigger_handler, + runtime_id="time-trigger-pass-through-test", + ) + + result = await runtime.execute( + input={"message": None}, + options=UiPathExecuteOptions(resume=False), + ) + + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert len(trigger_handler.suspend_values) == 1 + suspend_value = trigger_handler.suspend_values[0] + assert suspend_value.resume_time == resume_at + assert result.triggers is not None + assert result.triggers[0].trigger_type == UiPathResumeTriggerType.TIMER + assert result.triggers[0].payload == {"resumeTime": resume_at} + finally: + os.unlink(temp_db.name) + + @pytest.mark.asyncio async def test_parallel_branches_with_sequential_trigger_resolution(): """Test graph execution with parallel branches where triggers resolve sequentially.""" From c343656ff48aaaf801b203479c75c9a46715be34 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Mon, 29 Jun 2026 16:53:34 +0300 Subject: [PATCH 2/3] Make resume trigger storage atomic --- src/uipath_langchain/runtime/storage.py | 51 ++++++++++++++----------- tests/runtime/test_storage.py | 48 +++++++++++++++++++++++ 2 files changed, 76 insertions(+), 23 deletions(-) diff --git a/src/uipath_langchain/runtime/storage.py b/src/uipath_langchain/runtime/storage.py index b585dad0c..88bce1ba7 100644 --- a/src/uipath_langchain/runtime/storage.py +++ b/src/uipath_langchain/runtime/storage.py @@ -74,33 +74,38 @@ async def save_triggers( await self._ensure_table() async with self.memory.lock, self.memory.conn.cursor() as cur: - # Delete all existing triggers for this runtime_id - await cur.execute( - f""" - DELETE FROM {self.rs_table_name} - WHERE runtime_id = ? - """, - (runtime_id,), - ) - - for trigger in triggers: - trigger_data = trigger.model_dump() - trigger_data["payload"] = trigger.payload - trigger_data["trigger_name"] = trigger.trigger_name - + try: + await cur.execute("BEGIN IMMEDIATE") + # Delete all existing triggers for this runtime_id await cur.execute( f""" - INSERT INTO {self.rs_table_name} - (runtime_id, interrupt_id, data) - VALUES (?, ?, ?) + DELETE FROM {self.rs_table_name} + WHERE runtime_id = ? """, - ( - runtime_id, - trigger.interrupt_id, - serialize_json(trigger_data), - ), + (runtime_id,), ) - await self.memory.conn.commit() + + for trigger in triggers: + trigger_data = trigger.model_dump() + trigger_data["payload"] = trigger.payload + trigger_data["trigger_name"] = trigger.trigger_name + + await cur.execute( + f""" + INSERT INTO {self.rs_table_name} + (runtime_id, interrupt_id, data) + VALUES (?, ?, ?) + """, + ( + runtime_id, + trigger.interrupt_id, + serialize_json(trigger_data), + ), + ) + await self.memory.conn.commit() + except Exception: + await self.memory.conn.rollback() + raise async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None: """Get all triggers for runtime_id from database.""" diff --git a/tests/runtime/test_storage.py b/tests/runtime/test_storage.py index 6bd1bf385..fe8df41ca 100644 --- a/tests/runtime/test_storage.py +++ b/tests/runtime/test_storage.py @@ -13,6 +13,7 @@ UiPathResumeTriggerType, ) +from uipath_langchain.runtime import storage as storage_module from uipath_langchain.runtime.storage import SqliteResumableStorage @@ -167,6 +168,53 @@ async def test_save_empty_list_deletes_triggers( triggers = await storage.get_triggers("runtime1") assert triggers is None + @pytest.mark.asyncio + async def test_save_triggers_rolls_back_on_insert_failure( + self, storage: SqliteResumableStorage, monkeypatch: pytest.MonkeyPatch + ): + """Test that trigger replacement is all-or-nothing.""" + existing_trigger = UiPathResumeTrigger( + interrupt_id="existing", + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API, + payload="existing payload", + ) + await storage.save_triggers("runtime1", [existing_trigger]) + + new_trigger1 = UiPathResumeTrigger( + interrupt_id="new1", + trigger_type=UiPathResumeTriggerType.JOB, + trigger_name=UiPathResumeTriggerName.JOB, + payload="new payload 1", + ) + new_trigger2 = UiPathResumeTrigger( + interrupt_id="new2", + trigger_type=UiPathResumeTriggerType.TIMER, + trigger_name=UiPathResumeTriggerName.TIMER, + payload="new payload 2", + ) + + original_serialize_json = storage_module.serialize_json + call_count = 0 + + def fail_on_second_trigger(value: Any) -> str: + nonlocal call_count + call_count += 1 + if call_count == 2: + raise RuntimeError("serialization failed") + return original_serialize_json(value) + + monkeypatch.setattr(storage_module, "serialize_json", fail_on_second_trigger) + + with pytest.raises(RuntimeError, match="serialization failed"): + await storage.save_triggers("runtime1", [new_trigger1, new_trigger2]) + + triggers = await storage.get_triggers("runtime1") + assert triggers is not None + assert len(triggers) == 1 + assert triggers[0].interrupt_id == "existing" + assert triggers[0].payload == "existing payload" + @pytest.mark.asyncio async def test_delete_trigger(self, storage: SqliteResumableStorage): """Test deleting a specific trigger.""" From 23125f2e638c0a17986884692599a5da306f5861 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Tue, 30 Jun 2026 11:51:21 +0300 Subject: [PATCH 3/3] docs: show timer race interrupt composition --- docs/human_in_the_loop.md | 29 ++++++------ samples/README.md | 4 +- .../invoke-process-timeout-agent/README.md | 7 --- samples/invoke-process-timeout-agent/graph.py | 43 ----------------- .../invoke-process-timer-race-agent/README.md | 7 +++ .../bindings.json | 0 .../invoke-process-timer-race-agent/graph.py | 46 +++++++++++++++++++ .../langgraph.json | 2 +- .../pyproject.toml | 4 +- .../uipath.json | 0 10 files changed, 74 insertions(+), 68 deletions(-) delete mode 100644 samples/invoke-process-timeout-agent/README.md delete mode 100644 samples/invoke-process-timeout-agent/graph.py create mode 100644 samples/invoke-process-timer-race-agent/README.md rename samples/{invoke-process-timeout-agent => invoke-process-timer-race-agent}/bindings.json (100%) create mode 100644 samples/invoke-process-timer-race-agent/graph.py rename samples/{invoke-process-timeout-agent => invoke-process-timer-race-agent}/langgraph.json (52%) rename samples/{invoke-process-timeout-agent => invoke-process-timer-race-agent}/pyproject.toml (64%) rename samples/{invoke-process-timeout-agent => invoke-process-timer-race-agent}/uipath.json (100%) diff --git a/docs/human_in_the_loop.md b/docs/human_in_the_loop.md index ac12d09f4..148279905 100644 --- a/docs/human_in_the_loop.md +++ b/docs/human_in_the_loop.md @@ -494,27 +494,30 @@ This is intended for deployed jobs where Orchestrator owns the scheduler. In a l --- -## Interrupt timeouts +## Racing Multiple Resume Conditions -Typed interrupt models can also be given a timeout. The SDK creates both the requested resume trigger and a timer resume trigger; whichever fires first resumes the agent. Timeout resume values use reserved UiPath metadata so they cannot collide with user payload fields. +Pass a list of interrupt models to suspend on multiple resume conditions at once. The SDK creates sibling triggers for the same interrupt; whichever trigger fires first resumes the agent, and Orchestrator removes the remaining sibling triggers for that interrupt. ```python +from datetime import datetime, timedelta, timezone + from langgraph.types import interrupt -from uipath.platform.common import InvokeProcess -from uipath.platform.resume_triggers import assert_no_timeout, is_timeout +from uipath.platform.common import InvokeProcess, WaitUntil result = interrupt( - InvokeProcess( - name="long-running-agent", - input_arguments={"message": "start"}, - timeout=10, - ) + [ + InvokeProcess( + name="long-running-agent", + input_arguments={"message": "start"}, + ), + WaitUntil( + resume_time=datetime.now(timezone.utc) + timedelta(minutes=10), + ), + ] ) -if is_timeout(result): - result = retry_or_fallback() - -result = assert_no_timeout(result) +if isinstance(result, dict) and "resumeTime" in result: + raise TimeoutError("Process did not finish before the timer fired.") ``` --- diff --git a/samples/README.md b/samples/README.md index 728504d94..52db1a128 100644 --- a/samples/README.md +++ b/samples/README.md @@ -18,8 +18,8 @@ This sample demonstrates a FastAPI server for managing human-in-the-loop workflo ## [Wait until agent](wait-until-agent) This sample demonstrates a LangGraph agent that suspends with `WaitUntil` and resumes when Orchestrator fires a timer resume trigger. -## [Invoke process timeout agent](invoke-process-timeout-agent) -This sample demonstrates `InvokeProcess(..., timeout=...)` and handling timeout resume values with the timeout helpers. +## [Invoke process timer race agent](invoke-process-timer-race-agent) +This sample demonstrates racing an `InvokeProcess` interrupt with a `WaitUntil` timer interrupt. ## [Multi agent supervisor, researcher, coder](multi-agent-supervisor-researcher-coder) This sample showcases a multi-agent system, involving a supervisor, a researcher, and a coder working in coordination to tackle complex tasks. diff --git a/samples/invoke-process-timeout-agent/README.md b/samples/invoke-process-timeout-agent/README.md deleted file mode 100644 index 75fff031a..000000000 --- a/samples/invoke-process-timeout-agent/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# Invoke Process Timeout Agent - -This sample demonstrates timeout handling for typed interrupts. - -The parent graph invokes `CHILD_PROCESS_NAME = "timeout-child-agent"` with `InvokeProcess(..., timeout=600)`. If that process does not complete before the timeout, Orchestrator resumes the parent through the timer trigger first. The parent uses `assert_no_timeout`, which raises `UiPathTimeoutError` on timeout. - -The child process is declared in `bindings.json`. The sample code passes `process_folder_path="Shared"` when invoking the child process so the process lookup is explicit and can be changed with the binding to any process in the target organization. diff --git a/samples/invoke-process-timeout-agent/graph.py b/samples/invoke-process-timeout-agent/graph.py deleted file mode 100644 index 6d7cae254..000000000 --- a/samples/invoke-process-timeout-agent/graph.py +++ /dev/null @@ -1,43 +0,0 @@ -from typing import TypedDict - -from langgraph.graph import END, START, StateGraph -from langgraph.types import interrupt -from uipath.platform.common import InvokeProcess -from uipath.platform.resume_triggers import assert_no_timeout - -CHILD_PROCESS_NAME = "timeout-child-agent" -PROCESS_FOLDER_PATH = "Shared" - - -class ParentState(TypedDict, total=False): - message: str - status: str - child_result: str - timeout: dict[str, object] - - -def parent_node(state: ParentState) -> ParentState: - child_result = interrupt( - InvokeProcess( - name=CHILD_PROCESS_NAME, - process_folder_path=PROCESS_FOLDER_PATH, - input_arguments={"message": state.get("message", "start child work")}, - # allows up to 10 minutes for the child process to finish. - timeout=600, - ) - ) - - # Raises UiPathTimeoutError on timeout. - assert_no_timeout(child_result) - - return { - "status": "completed", - "child_result": str(child_result), - } - - -parent_builder = StateGraph(ParentState) -parent_builder.add_node("parent", parent_node) -parent_builder.add_edge(START, "parent") -parent_builder.add_edge("parent", END) -parent_graph = parent_builder.compile() diff --git a/samples/invoke-process-timer-race-agent/README.md b/samples/invoke-process-timer-race-agent/README.md new file mode 100644 index 000000000..5e9eba556 --- /dev/null +++ b/samples/invoke-process-timer-race-agent/README.md @@ -0,0 +1,7 @@ +# Invoke Process Timer Race Agent + +This sample demonstrates racing a child process interrupt with a timer interrupt. + +The parent graph suspends with `interrupt([InvokeProcess(...), WaitUntil(...)])`. If `CHILD_PROCESS_NAME = "timeout-child-agent"` does not complete before the timer fires, Orchestrator resumes the parent through the timer trigger first and the sample raises `TimeoutError`. + +The child process is declared in `bindings.json`. The sample code passes `process_folder_path="Shared"` when invoking the child process so the process lookup is explicit and can be changed with the binding to any process in the target organization. diff --git a/samples/invoke-process-timeout-agent/bindings.json b/samples/invoke-process-timer-race-agent/bindings.json similarity index 100% rename from samples/invoke-process-timeout-agent/bindings.json rename to samples/invoke-process-timer-race-agent/bindings.json diff --git a/samples/invoke-process-timer-race-agent/graph.py b/samples/invoke-process-timer-race-agent/graph.py new file mode 100644 index 000000000..e4785b30b --- /dev/null +++ b/samples/invoke-process-timer-race-agent/graph.py @@ -0,0 +1,46 @@ +from datetime import datetime, timedelta, timezone +from typing import TypedDict + +from langgraph.graph import END, START, StateGraph +from langgraph.types import interrupt +from uipath.platform.common import InvokeProcess, WaitUntil + +CHILD_PROCESS_NAME = "timeout-child-agent" +PROCESS_FOLDER_PATH = "Shared" + + +class ParentState(TypedDict, total=False): + message: str + status: str + child_result: str + + +def parent_node(state: ParentState) -> ParentState: + child_result = interrupt( + [ + InvokeProcess( + name=CHILD_PROCESS_NAME, + process_folder_path=PROCESS_FOLDER_PATH, + input_arguments={"message": state.get("message", "start child work")}, + ), + WaitUntil( + # allows up to 10 minutes for the child process to finish. + resume_time=datetime.now(timezone.utc) + timedelta(minutes=10), + ), + ] + ) + + if isinstance(child_result, dict) and "resumeTime" in child_result: + raise TimeoutError("Child process did not finish before the timer fired.") + + return { + "status": "completed", + "child_result": str(child_result), + } + + +parent_builder = StateGraph(ParentState) +parent_builder.add_node("parent", parent_node) +parent_builder.add_edge(START, "parent") +parent_builder.add_edge("parent", END) +parent_graph = parent_builder.compile() diff --git a/samples/invoke-process-timeout-agent/langgraph.json b/samples/invoke-process-timer-race-agent/langgraph.json similarity index 52% rename from samples/invoke-process-timeout-agent/langgraph.json rename to samples/invoke-process-timer-race-agent/langgraph.json index c12bdf184..02fc02ab6 100644 --- a/samples/invoke-process-timeout-agent/langgraph.json +++ b/samples/invoke-process-timer-race-agent/langgraph.json @@ -1,7 +1,7 @@ { "dependencies": ["."], "graphs": { - "timeout-parent-agent": "./graph.py:parent_graph" + "timer-race-parent-agent": "./graph.py:parent_graph" }, "env": ".env" } diff --git a/samples/invoke-process-timeout-agent/pyproject.toml b/samples/invoke-process-timer-race-agent/pyproject.toml similarity index 64% rename from samples/invoke-process-timeout-agent/pyproject.toml rename to samples/invoke-process-timer-race-agent/pyproject.toml index 9e68bfb40..90d48fae0 100644 --- a/samples/invoke-process-timeout-agent/pyproject.toml +++ b/samples/invoke-process-timer-race-agent/pyproject.toml @@ -1,7 +1,7 @@ [project] -name = "invoke-process-timeout-agent" +name = "invoke-process-timer-race-agent" version = "0.0.1" -description = "LangGraph sample showing InvokeProcess timeout handling" +description = "LangGraph sample showing InvokeProcess and WaitUntil timer racing" authors = [{ name = "John Doe", email = "john.doe@myemail.com" }] requires-python = ">=3.11" dependencies = [ diff --git a/samples/invoke-process-timeout-agent/uipath.json b/samples/invoke-process-timer-race-agent/uipath.json similarity index 100% rename from samples/invoke-process-timeout-agent/uipath.json rename to samples/invoke-process-timer-race-agent/uipath.json