Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/human_in_the_loop.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,60 @@ For a practical implementation, refer to the [email-triage-agent sample](https:/

---

## Time triggers

### WaitUntil

Suspends the agent until an Orchestrator timer fires. The SDK stores the resume timestamp as part of the resume condition, and Orchestrator resumes the suspended job when that time is reached.

| Attribute | Type | Description |
| --- | --- | --- |
| `resume_time` | `datetime` | Absolute time when Orchestrator should resume the suspended job. Use a timezone-aware `datetime`; it is normalized to UTC before the trigger is created. |

```python
from langgraph.types import interrupt
from uipath.platform.common import WaitUntil
from datetime import datetime, timedelta, timezone

resume_payload = interrupt(
WaitUntil(
resume_time=datetime.now(timezone.utc) + timedelta(minutes=5),
)
)
```

This is intended for deployed jobs where Orchestrator owns the scheduler. In a local run there is no scheduler to fire the time trigger, so resume the interrupt explicitly when testing locally.

---

## Racing Multiple Resume Conditions

Pass a list of interrupt models to suspend on multiple resume conditions at once. The SDK creates sibling triggers for the same interrupt; whichever trigger fires first resumes the agent, and Orchestrator removes the remaining sibling triggers for that interrupt.

```python
from datetime import datetime, timedelta, timezone

from langgraph.types import interrupt
from uipath.platform.common import InvokeProcess, WaitUntil

result = interrupt(
[
InvokeProcess(
name="long-running-agent",
input_arguments={"message": "start"},
),
WaitUntil(
resume_time=datetime.now(timezone.utc) + timedelta(minutes=10),
),
]
)

if isinstance(result, dict) and "resumeTime" in result:
raise TimeoutError("Process did not finish before the timer fired.")
```

---

## Resuming with a plain value (API trigger)

All the models above are typed interrupts that tie the agent's wait state to a specific UiPath operation. When you call `interrupt(...)` with a value that is **not** one of those models, most commonly a plain string but any JSON-serializable value works, the SDK creates an **API resume trigger**. The agent suspends and waits to be resumed by an explicit external API call rather than by polling a UiPath operation.
Expand Down
6 changes: 6 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ This sample shows how to automate Outlook inbox organization with AI-powered rul
## [HITL inbox server](hitl-inbox-server)
This sample demonstrates a FastAPI server for managing human-in-the-loop workflows with job submissions and inbox message approvals.

## [Wait until agent](wait-until-agent)
This sample demonstrates a LangGraph agent that suspends with `WaitUntil` and resumes when Orchestrator fires a timer resume trigger.

## [Invoke process timer race agent](invoke-process-timer-race-agent)
This sample demonstrates racing an `InvokeProcess` interrupt with a `WaitUntil` timer interrupt.

## [Multi agent supervisor, researcher, coder](multi-agent-supervisor-researcher-coder)
This sample showcases a multi-agent system, involving a supervisor, a researcher, and a coder working in coordination to tackle complex tasks.

Expand Down
7 changes: 7 additions & 0 deletions samples/invoke-process-timer-race-agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Invoke Process Timer Race Agent

This sample demonstrates racing a child process interrupt with a timer interrupt.

The parent graph suspends with `interrupt([InvokeProcess(...), WaitUntil(...)])`. If `CHILD_PROCESS_NAME = "timeout-child-agent"` does not complete before the timer fires, Orchestrator resumes the parent through the timer trigger first and the sample raises `TimeoutError`.

The child process is declared in `bindings.json`. The sample code passes `process_folder_path="Shared"` when invoking the child process so the process lookup is explicit and can be changed with the binding to any process in the target organization.
26 changes: 26 additions & 0 deletions samples/invoke-process-timer-race-agent/bindings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"version": "2.0",
"resources": [
{
"resource": "process",
"key": "timeout-child-agent.Shared",
"value": {
"name": {
"defaultValue": "timeout-child-agent",
"isExpression": false,
"displayName": "Process Name"
},
"folderPath": {
"defaultValue": "Shared",
"isExpression": false,
"displayName": "Process Folder Path"
}
},
"metadata": {
"ActivityName": "InvokeProcess",
"BindingsVersion": "2.2",
"DisplayLabel": "timeout-child-agent"
}
}
]
}
46 changes: 46 additions & 0 deletions samples/invoke-process-timer-race-agent/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime, timedelta, timezone
from typing import TypedDict

from langgraph.graph import END, START, StateGraph
from langgraph.types import interrupt
from uipath.platform.common import InvokeProcess, WaitUntil

CHILD_PROCESS_NAME = "timeout-child-agent"
PROCESS_FOLDER_PATH = "Shared"


class ParentState(TypedDict, total=False):
message: str
status: str
child_result: str


def parent_node(state: ParentState) -> ParentState:
child_result = interrupt(
[
InvokeProcess(
name=CHILD_PROCESS_NAME,
process_folder_path=PROCESS_FOLDER_PATH,
input_arguments={"message": state.get("message", "start child work")},
),
WaitUntil(
# allows up to 10 minutes for the child process to finish.
resume_time=datetime.now(timezone.utc) + timedelta(minutes=10),
),
]
)

if isinstance(child_result, dict) and "resumeTime" in child_result:
raise TimeoutError("Child process did not finish before the timer fired.")

return {
"status": "completed",
"child_result": str(child_result),
}


parent_builder = StateGraph(ParentState)
parent_builder.add_node("parent", parent_node)
parent_builder.add_edge(START, "parent")
parent_builder.add_edge("parent", END)
parent_graph = parent_builder.compile()
7 changes: 7 additions & 0 deletions samples/invoke-process-timer-race-agent/langgraph.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"dependencies": ["."],
"graphs": {
"timer-race-parent-agent": "./graph.py:parent_graph"
},
"env": ".env"
}
15 changes: 15 additions & 0 deletions samples/invoke-process-timer-race-agent/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[project]
name = "invoke-process-timer-race-agent"
version = "0.0.1"
description = "LangGraph sample showing InvokeProcess and WaitUntil timer racing"
authors = [{ name = "John Doe", email = "john.doe@myemail.com" }]
requires-python = ">=3.11"
dependencies = [
"uipath",
"uipath-langchain",
]

[dependency-groups]
dev = [
"uipath-dev",
]
14 changes: 14 additions & 0 deletions samples/invoke-process-timer-race-agent/uipath.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "https://cloud.uipath.com/draft/2024-12/uipath",
"runtimeOptions": {
"isConversational": false
},
"packOptions": {
"fileExtensionsIncluded": [],
"filesIncluded": [],
"filesExcluded": [],
"directoriesExcluded": [],
"includeUvLock": true
},
"functions": {}
}
7 changes: 7 additions & 0 deletions samples/wait-until-agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Wait Until Agent

This sample demonstrates a LangGraph agent that suspends with `WaitUntil` and resumes when Orchestrator fires the timer resume trigger.

The resume time is an absolute timezone-aware `datetime`. The SDK normalizes it to UTC before creating the timer trigger.

The sample includes an empty `bindings.json` file so the project has the same deployable shape as samples that declare resource bindings.
4 changes: 4 additions & 0 deletions samples/wait-until-agent/bindings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"version": "2.0",
"resources": []
}
31 changes: 31 additions & 0 deletions samples/wait-until-agent/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime, timedelta, timezone
from typing import TypedDict

from langgraph.graph import END, START, StateGraph
from langgraph.types import interrupt
from uipath.platform.common import WaitUntil


class State(TypedDict, total=False):
message: str
resumed_at: str
resume_payload: str


def wait_until_node(state: State) -> State:
resume_time = datetime.now(timezone.utc) + timedelta(minutes=5)
resume_payload = interrupt(WaitUntil(resume_time=resume_time))

return {
"message": state.get("message", "wait completed"),
"resumed_at": datetime.now(timezone.utc).isoformat(),
"resume_payload": str(resume_payload),
}


builder = StateGraph(State)
builder.add_node("wait_until", wait_until_node)
builder.add_edge(START, "wait_until")
builder.add_edge("wait_until", END)

graph = builder.compile()
7 changes: 7 additions & 0 deletions samples/wait-until-agent/langgraph.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"dependencies": ["."],
"graphs": {
"wait-until-agent": "./graph.py:graph"
},
"env": ".env"
}
15 changes: 15 additions & 0 deletions samples/wait-until-agent/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[project]
name = "wait-until-agent"
version = "0.0.1"
description = "LangGraph sample showing WaitUntil timer-based resume triggers"
authors = [{ name = "John Doe", email = "john.doe@myemail.com" }]
requires-python = ">=3.11"
dependencies = [
"uipath",
"uipath-langchain",
]

[dependency-groups]
dev = [
"uipath-dev",
]
14 changes: 14 additions & 0 deletions samples/wait-until-agent/uipath.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "https://cloud.uipath.com/draft/2024-12/uipath",
"runtimeOptions": {
"isConversational": false
},
"packOptions": {
"fileExtensionsIncluded": [],
"filesIncluded": [],
"filesExcluded": [],
"directoriesExcluded": [],
"includeUvLock": true
},
"functions": {}
}
51 changes: 28 additions & 23 deletions src/uipath_langchain/runtime/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,38 @@ async def save_triggers(
await self._ensure_table()

async with self.memory.lock, self.memory.conn.cursor() as cur:
# Delete all existing triggers for this runtime_id
await cur.execute(
f"""
DELETE FROM {self.rs_table_name}
WHERE runtime_id = ?
""",
(runtime_id,),
)

for trigger in triggers:
trigger_data = trigger.model_dump()
trigger_data["payload"] = trigger.payload
trigger_data["trigger_name"] = trigger.trigger_name

try:
await cur.execute("BEGIN IMMEDIATE")
# Delete all existing triggers for this runtime_id
await cur.execute(
f"""
INSERT INTO {self.rs_table_name}
(runtime_id, interrupt_id, data)
VALUES (?, ?, ?)
DELETE FROM {self.rs_table_name}
WHERE runtime_id = ?
""",
(
runtime_id,
trigger.interrupt_id,
serialize_json(trigger_data),
),
(runtime_id,),
)
await self.memory.conn.commit()

for trigger in triggers:
trigger_data = trigger.model_dump()
trigger_data["payload"] = trigger.payload
trigger_data["trigger_name"] = trigger.trigger_name

await cur.execute(
f"""
INSERT INTO {self.rs_table_name}
(runtime_id, interrupt_id, data)
VALUES (?, ?, ?)
""",
(
runtime_id,
trigger.interrupt_id,
serialize_json(trigger_data),
),
)
await self.memory.conn.commit()
except Exception:
await self.memory.conn.rollback()
raise

async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None:
"""Get all triggers for runtime_id from database."""
Expand Down
33 changes: 33 additions & 0 deletions tests/hitl/mocks/time_trigger_hitl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime, timedelta, timezone
from typing import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import interrupt
from uipath.platform.common import WaitUntil


class State(TypedDict):
message: str


def main_node(state: State) -> State:
response = interrupt(
WaitUntil(
resume_time=datetime.now(timezone.utc) + timedelta(minutes=1),
)
)
return {"message": str(response)}


builder: StateGraph[State] = StateGraph(State)

builder.add_node("main_node", main_node)

builder.add_edge(START, "main_node")
builder.add_edge("main_node", END)


memory = MemorySaver()

graph = builder.compile(checkpointer=memory)
Loading
Loading