Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/human_in_the_loop.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,32 @@ For a practical implementation, refer to the [email-triage-agent sample](https:/

---

## Time triggers

### WaitTimeTrigger

Suspends the agent until an Orchestrator timer fires. The SDK stores the resume timestamp as part of the resume condition, and Orchestrator resumes the suspended job when that time is reached.

| Attribute | Type | Description |
| --- | --- | --- |
| `resume_time` | `datetime \| str` | Absolute time when Orchestrator should resume the suspended job. Use a timezone-aware UTC `datetime` or an ISO 8601 timestamp. |

```python
from langgraph.types import interrupt
from uipath.platform.common import WaitTimeTrigger
from datetime import datetime, timedelta, timezone

resume_payload = interrupt(
WaitTimeTrigger(
resume_time=datetime.now(timezone.utc) + timedelta(minutes=5),
)
)
```

This is intended for deployed jobs where Orchestrator owns the scheduler. In a local run there is no scheduler to fire the time trigger, so resume the interrupt explicitly when testing locally.

---

## Resuming with a plain value (API trigger)

All the models above are typed interrupts that tie the agent's wait state to a specific UiPath operation. When you call `interrupt(...)` with a value that is **not** one of those models, most commonly a plain string but any JSON-serializable value works, the SDK creates an **API resume trigger**. The agent suspends and waits to be resumed by an explicit external API call rather than by polling a UiPath operation.
Expand Down
33 changes: 33 additions & 0 deletions tests/hitl/mocks/time_trigger_hitl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime, timedelta, timezone
from typing import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import interrupt
from uipath.platform.common import WaitTimeTrigger


class State(TypedDict):
message: str


def main_node(state: State) -> State:
response = interrupt(
WaitTimeTrigger(
resume_time=datetime.now(timezone.utc) + timedelta(minutes=1),
)
)
return {"message": str(response)}


builder: StateGraph[State] = StateGraph(State)

builder.add_node("main_node", main_node)

builder.add_edge(START, "main_node")
builder.add_edge("main_node", END)


memory = MemorySaver()

graph = builder.compile(checkpointer=memory)
95 changes: 95 additions & 0 deletions tests/runtime/test_resumable.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os
import tempfile
from datetime import datetime, timezone
from typing import Any, TypedDict

import pytest
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import interrupt
from pydantic import BaseModel, ConfigDict, Field
from uipath.core.errors import ErrorCategory, UiPathPendingTriggerError
from uipath.core.triggers import (
UiPathResumeTrigger,
Expand Down Expand Up @@ -86,6 +88,99 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any:
return f"Response for {branch_name}"


class TimerLikeInterrupt(BaseModel):
"""Timer-shaped model used to verify typed interrupt pass-through."""

resume_time: datetime | str = Field(alias="resumeTime")

model_config = ConfigDict(validate_by_name=True)


class CapturingTimerTriggerHandler:
"""Mock implementation that captures typed time trigger interrupt values."""

def __init__(self):
self.suspend_values: list[TimerLikeInterrupt] = []

async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
"""Create a timer trigger from the typed suspend value."""
assert isinstance(suspend_value, TimerLikeInterrupt)
self.suspend_values.append(suspend_value)
return UiPathResumeTrigger(
trigger_type=UiPathResumeTriggerType.TIMER,
trigger_name=UiPathResumeTriggerName.TIMER,
payload=suspend_value.model_dump(by_alias=True),
)

async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any:
"""Timer triggers are resumed externally and remain pending locally."""
raise UiPathPendingTriggerError(
ErrorCategory.SYSTEM, "Time trigger is still pending"
)


@pytest.mark.asyncio
async def test_typed_time_interrupt_is_forwarded_to_trigger_handler():
"""LangGraph interrupt values are passed through for timer-shaped models."""

class State(TypedDict, total=False):
message: str | None

def wait_for_time(state: State) -> State:
result = interrupt(
TimerLikeInterrupt(
resume_time=datetime(2026, 6, 27, 20, 14, 49, tzinfo=timezone.utc),
)
)
return {"message": f"resumed with: {result}"}

graph = StateGraph(State)
graph.add_node("wait_for_time", wait_for_time)
graph.add_edge(START, "wait_for_time")
graph.add_edge("wait_for_time", END)

temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db")
temp_db.close()

try:
async with AsyncSqliteSaver.from_conn_string(temp_db.name) as memory:
compiled_graph = graph.compile(checkpointer=memory)
base_runtime = UiPathLangGraphRuntime(
graph=compiled_graph,
runtime_id="time-trigger-pass-through-test",
entrypoint="test",
)
storage = SqliteResumableStorage(memory)
trigger_handler = CapturingTimerTriggerHandler()
runtime = UiPathResumableRuntime(
delegate=base_runtime,
storage=storage,
trigger_manager=trigger_handler,
runtime_id="time-trigger-pass-through-test",
)

result = await runtime.execute(
input={"message": None},
options=UiPathExecuteOptions(resume=False),
)

assert result.status == UiPathRuntimeStatus.SUSPENDED
assert len(trigger_handler.suspend_values) == 1
suspend_value = trigger_handler.suspend_values[0]
assert suspend_value.resume_time == datetime(
2026, 6, 27, 20, 14, 49, tzinfo=timezone.utc
)
assert result.triggers is not None
assert result.triggers[0].trigger_type == UiPathResumeTriggerType.TIMER
assert result.triggers[0].payload == {
"resumeTime": datetime(
2026, 6, 27, 20, 14, 49, tzinfo=timezone.utc
),
}
finally:
os.unlink(temp_db.name)


@pytest.mark.asyncio
async def test_parallel_branches_with_sequential_trigger_resolution():
"""Test graph execution with parallel branches where triggers resolve sequentially."""
Expand Down
Loading