From a0eb8ada9343a977971bb14feaadb9b0f59301fe Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Tue, 30 Jun 2026 12:42:21 +0300 Subject: [PATCH] feat: support multiple resume triggers per interrupt --- .github/scripts/force-runtime-override.py | 91 ++++ .github/workflows/test-uipath-langchain.yml | 14 +- .github/workflows/test-uipath.yml | 14 +- pyproject.toml | 4 +- src/uipath/runtime/resumable/protocols.py | 41 +- src/uipath/runtime/resumable/runtime.py | 86 +++- tests/test_resumable.py | 437 +++++++++++++++++++- uv.lock | 10 +- 8 files changed, 647 insertions(+), 50 deletions(-) create mode 100644 .github/scripts/force-runtime-override.py diff --git a/.github/scripts/force-runtime-override.py b/.github/scripts/force-runtime-override.py new file mode 100644 index 0000000..5abbacd --- /dev/null +++ b/.github/scripts/force-runtime-override.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +"""Force cross-repo uv resolves to use the runtime wheel under test.""" + +from __future__ import annotations + +import re +import sys +from pathlib import Path + + +def _quoted(value: str) -> str: + return '"' + value.replace("\\", "\\\\").replace('"', '\\"') + '"' + + +def _add_override(pyproject_path: Path, override: str) -> None: + text = pyproject_path.read_text() + quoted_override = _quoted(override) + + if override in text: + return + + tool_uv_match = re.search( + r"(?ms)^\[tool\.uv\]\n(?P.*?)(?=^\[|\Z)", + text, + ) + if not tool_uv_match: + text = ( + text.rstrip() + + f"\n\n[tool.uv]\noverride-dependencies = [{quoted_override}]\n" + ) + pyproject_path.write_text(text) + return + + body = tool_uv_match.group("body") + override_match = re.search( + r"(?ms)^override-dependencies\s*=\s*\[(?P.*?)\]\s*", + body, + ) + if override_match: + items = override_match.group("items").strip() + if quoted_override in items: + return + + if items: + replacement = f"override-dependencies = [{items}, {quoted_override}]\n" + else: + replacement = f"override-dependencies = [{quoted_override}]\n" + + body = ( + body[: override_match.start()] + replacement + body[override_match.end() :] + ) + else: + body = f"override-dependencies = [{quoted_override}]\n" + body + + text = ( + text[: tool_uv_match.start("body")] + body + text[tool_uv_match.end("body") :] + ) + pyproject_path.write_text(text) + + +def main() -> None: + """Add a local runtime wheel override to a pyproject and print the wheel path.""" + if len(sys.argv) != 3: + print( + "usage: force-runtime-override.py ", + file=sys.stderr, + ) + raise SystemExit(2) + + pyproject_path = Path(sys.argv[1]) + wheel_dir = Path(sys.argv[2]) + wheels = sorted(wheel_dir.glob("uipath_runtime-*.whl")) + if not wheels: + print(f"no uipath-runtime wheel found in {wheel_dir}", file=sys.stderr) + raise SystemExit(1) + if len(wheels) > 1: + wheel_names = ", ".join(str(wheel) for wheel in wheels) + print( + f"expected one uipath-runtime wheel in {wheel_dir}, found: {wheel_names}", + file=sys.stderr, + ) + raise SystemExit(1) + + wheel = wheels[0].resolve() + + _add_override(pyproject_path, f"uipath-runtime @ {wheel.as_uri()}") + print(wheel) + + +if __name__ == "__main__": + main() diff --git a/.github/workflows/test-uipath-langchain.yml b/.github/workflows/test-uipath-langchain.yml index d30d20e..dc1145d 100644 --- a/.github/workflows/test-uipath-langchain.yml +++ b/.github/workflows/test-uipath-langchain.yml @@ -45,7 +45,9 @@ jobs: - name: Update uipath-runtime version shell: bash working-directory: uipath-langchain-python - run: uv add ../uipath-runtime-python/dist/*.whl --dev + run: | + runtime_wheel="$(python ../uipath-runtime-python/.github/scripts/force-runtime-override.py pyproject.toml ../uipath-runtime-python/dist)" + uv add "$runtime_wheel" --dev - name: Run uipath-langchain tests working-directory: uipath-langchain-python @@ -89,7 +91,9 @@ jobs: - name: Update uipath-runtime version shell: bash working-directory: uipath-langchain-python - run: uv add ../uipath-runtime-python/dist/*.whl --dev + run: | + runtime_wheel="$(python ../uipath-runtime-python/.github/scripts/force-runtime-override.py pyproject.toml ../uipath-runtime-python/dist)" + uv add "$runtime_wheel" --dev - name: Run typecheck working-directory: uipath-langchain-python @@ -163,7 +167,9 @@ jobs: - name: Update uipath-runtime version shell: bash working-directory: uipath-langchain-python - run: uv add ../uipath-runtime-python/dist/*.whl --dev + run: | + runtime_wheel="$(python ../uipath-runtime-python/.github/scripts/force-runtime-override.py pyproject.toml ../uipath-runtime-python/dist)" + uv add "$runtime_wheel" --dev - name: Install dependencies working-directory: uipath-langchain-python @@ -182,8 +188,8 @@ jobs: echo "Environment: ${{ matrix.environment }}" echo "LLM: ${{ matrix.use_azure_chat && 'UiPathAzureChatOpenAI' || 'UiPathChat' }}" echo "USE_AZURE_CHAT: ${{ matrix.use_azure_chat }}" + python ../../../uipath-runtime-python/.github/scripts/force-runtime-override.py pyproject.toml ../../../uipath-runtime-python/dist # Execute the testcase run script directly bash run.sh bash ../common/validate_output.sh - diff --git a/.github/workflows/test-uipath.yml b/.github/workflows/test-uipath.yml index ddcc5e1..347504f 100644 --- a/.github/workflows/test-uipath.yml +++ b/.github/workflows/test-uipath.yml @@ -45,7 +45,9 @@ jobs: - name: Update uipath-runtime version shell: bash working-directory: uipath-python/packages/uipath - run: uv add ../../../uipath-runtime-python/dist/*.whl --dev + run: | + runtime_wheel="$(python ../../../uipath-runtime-python/.github/scripts/force-runtime-override.py pyproject.toml ../../../uipath-runtime-python/dist)" + uv add "$runtime_wheel" --dev - name: Run uipath tests working-directory: uipath-python/packages/uipath @@ -89,7 +91,9 @@ jobs: - name: Update uipath-runtime version shell: bash working-directory: uipath-python/packages/uipath - run: uv add ../../../uipath-runtime-python/dist/*.whl --dev + run: | + runtime_wheel="$(python ../../../uipath-runtime-python/.github/scripts/force-runtime-override.py pyproject.toml ../../../uipath-runtime-python/dist)" + uv add "$runtime_wheel" --dev - name: Run typecheck working-directory: uipath-python/packages/uipath @@ -162,7 +166,9 @@ jobs: - name: Update uipath-runtime version shell: bash working-directory: uipath-python/packages/uipath - run: uv add ../../../uipath-runtime-python/dist/*.whl --dev + run: | + runtime_wheel="$(python ../../../uipath-runtime-python/.github/scripts/force-runtime-override.py pyproject.toml ../../../uipath-runtime-python/dist)" + uv add "$runtime_wheel" --dev - name: Install dependencies working-directory: uipath-python/packages/uipath @@ -179,8 +185,8 @@ jobs: run: | echo "Running testcase: ${{ matrix.testcase }}" echo "Environment: ${{ matrix.environment }}" + python ../../../../../uipath-runtime-python/.github/scripts/force-runtime-override.py pyproject.toml ../../../../../uipath-runtime-python/dist # Execute the testcase run script directly bash run.sh bash ../common/validate_output.sh - diff --git a/pyproject.toml b/pyproject.toml index 2bf085c..7868fc9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,11 @@ [project] name = "uipath-runtime" -version = "0.11.5" +version = "0.12.0" description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" dependencies = [ - "uipath-core>=0.5.26,<0.6.0", + "uipath-core>=0.5.27,<0.6.0", ] classifiers = [ "Intended Audience :: Developers", diff --git a/src/uipath/runtime/resumable/protocols.py b/src/uipath/runtime/resumable/protocols.py index 7ce77c8..e5dd8de 100644 --- a/src/uipath/runtime/resumable/protocols.py +++ b/src/uipath/runtime/resumable/protocols.py @@ -1,5 +1,6 @@ """Module defining the protocol for resume trigger storage.""" +import warnings from typing import Any, Protocol from uipath.core.triggers import UiPathResumeTrigger @@ -34,6 +35,21 @@ async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | Non """ ... + async def delete_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + """Delete resume triggers from storage. + + Args: + runtime_id: The runtime ID + triggers: The resume triggers to delete + + Raises: + Exception: If deletion operation fails + """ + for trigger in triggers: + await self.delete_trigger(runtime_id, trigger) + async def delete_trigger( self, runtime_id: str, trigger: UiPathResumeTrigger ) -> None: @@ -46,7 +62,13 @@ async def delete_trigger( Raises: Exception: If deletion operation fails """ - ... + warnings.warn( + "delete_trigger() is deprecated; use delete_triggers() instead.", + DeprecationWarning, + stacklevel=2, + ) + # TODO: Remove this compatibility alias in the next minor version. + await self.delete_triggers(runtime_id, [trigger]) class UiPathResumeTriggerCreatorProtocol(Protocol): @@ -68,6 +90,23 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: """ ... + async def create_triggers(self, suspend_value: Any) -> list[UiPathResumeTrigger]: + """Create resume triggers from a suspend value. + + Most suspend values produce one trigger. Composite values may produce + multiple sibling triggers for the same interrupt. + + Args: + suspend_value: The value that caused the suspension. + + Returns: + UiPathResumeTrigger objects ready to be persisted. + + Raises: + UiPathRuntimeError: If trigger creation fails + """ + return [await self.create_trigger(suspend_value)] + class UiPathResumeTriggerReaderProtocol(Protocol): """Protocol for reading resume triggers and converting them to runtime input.""" diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index 8cf2ed5..4b03dd7 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -1,10 +1,16 @@ """Resumable runtime protocol and implementation.""" import logging +from collections import Counter +from enum import Enum from typing import Any, AsyncGenerator from uipath.core.errors import UiPathPendingTriggerError -from uipath.core.triggers import UiPathResumeTrigger, UiPathResumeTriggerType +from uipath.core.triggers import ( + UIPATH_METADATA_KEY, + UiPathResumeTrigger, + UiPathResumeTriggerType, +) from uipath.runtime.base import ( UiPathExecuteOptions, @@ -168,7 +174,7 @@ async def _get_fired_triggers(self) -> dict[str, Any] | None: UiPathResumeTriggerType.TIMER, ) ] - return await self._build_resume_map(pollable_triggers) + return await self._build_resume_map(pollable_triggers, all_triggers=triggers) async def _restore_resume_input( self, @@ -190,15 +196,17 @@ async def _restore_resume_input( if triggers: if len(triggers) == 1: # Single trigger - just delete it - await self.storage.delete_trigger(self.runtime_id, triggers[0]) + await self.storage.delete_triggers(self.runtime_id, triggers) else: # Multiple triggers - match by interrupt_id - found = False - for trigger in triggers: - if trigger.interrupt_id in input: - await self.storage.delete_trigger(self.runtime_id, trigger) - found = True - if not found: + matched_triggers = [ + trigger for trigger in triggers if trigger.interrupt_id in input + ] + if matched_triggers: + await self.storage.delete_triggers( + self.runtime_id, matched_triggers + ) + else: logger.warning( f"Multiple triggers detected but none match the provided input. " f"Please specify which trigger to resume by {{interrupt_id: value}}. " @@ -213,31 +221,70 @@ async def _restore_resume_input( async def _build_resume_map( self, - triggers: list[UiPathResumeTrigger], + pollable_triggers: list[UiPathResumeTrigger], + all_triggers: list[UiPathResumeTrigger] | None = None, ) -> dict[str, Any]: """Build resume map from triggers: {interrupt_id: resume_data}. Args: - triggers: List of triggers to read and map + pollable_triggers: List of triggers to read and map. + all_triggers: Full trigger set to use when detecting sibling triggers. Returns: A dict mapping interrupt_id to the trigger's resume data. """ resume_map: dict[str, Any] = {} - for trigger in triggers: + trigger_count_by_interrupt_id = Counter( + trigger.interrupt_id for trigger in all_triggers or pollable_triggers + ) + for trigger in pollable_triggers: + assert trigger.interrupt_id is not None, ( + "Trigger interrupt_id cannot be None" + ) + if trigger.interrupt_id in resume_map: + continue + try: data = await self.trigger_manager.read_trigger(trigger) - assert trigger.interrupt_id is not None, ( - "Trigger interrupt_id cannot be None" - ) + if trigger_count_by_interrupt_id[trigger.interrupt_id] > 1: + data = self._with_trigger_metadata(trigger, data) resume_map[trigger.interrupt_id] = data - await self.storage.delete_trigger(self.runtime_id, trigger) + sibling_triggers = [ + sibling + for sibling in all_triggers or pollable_triggers + if sibling.interrupt_id == trigger.interrupt_id + ] + await self.storage.delete_triggers(self.runtime_id, sibling_triggers) except UiPathPendingTriggerError: # Trigger still pending, skip it pass return resume_map + def _with_trigger_metadata( + self, trigger: UiPathResumeTrigger, data: Any + ) -> dict[str, Any]: + metadata = { + "triggerType": self._metadata_value(trigger.trigger_type), + "triggerName": self._metadata_value(trigger.trigger_name), + } + + if isinstance(data, dict): + existing_metadata = data.get(UIPATH_METADATA_KEY) + if isinstance(existing_metadata, dict): + metadata = {**existing_metadata, **metadata} + + return {**data, UIPATH_METADATA_KEY: metadata} + + return {UIPATH_METADATA_KEY: metadata, "value": data} + + @staticmethod + def _metadata_value(value: Any) -> Any: + if isinstance(value, Enum): + return value.value + + return value + async def _handle_suspension( self, result: UiPathRuntimeResult ) -> UiPathRuntimeResult: @@ -273,11 +320,12 @@ async def _handle_suspension( # Create triggers only for new interrupts for interrupt_id in new_ids: - trigger = await self.trigger_manager.create_trigger( + triggers = await self.trigger_manager.create_triggers( current_interrupts[interrupt_id] ) - trigger.interrupt_id = interrupt_id - suspended_result.triggers.append(trigger) + for trigger in triggers: + trigger.interrupt_id = interrupt_id + suspended_result.triggers.append(trigger) if suspended_result.triggers: await self.storage.save_triggers(self.runtime_id, suspended_result.triggers) diff --git a/tests/test_resumable.py b/tests/test_resumable.py index 16d0c41..dc8407e 100644 --- a/tests/test_resumable.py +++ b/tests/test_resumable.py @@ -7,7 +7,11 @@ import pytest from uipath.core.errors import ErrorCategory, UiPathPendingTriggerError -from uipath.core.triggers import UiPathResumeTrigger, UiPathResumeTriggerType +from uipath.core.triggers import ( + UiPathResumeTrigger, + UiPathResumeTriggerName, + UiPathResumeTriggerType, +) from uipath.runtime import ( UiPathExecuteOptions, @@ -17,13 +21,14 @@ ) from uipath.runtime.events import UiPathRuntimeEvent from uipath.runtime.resumable.protocols import ( + UiPathResumableStorageProtocol, UiPathResumeTriggerProtocol, ) from uipath.runtime.resumable.runtime import UiPathResumableRuntime from uipath.runtime.schema import UiPathRuntimeSchema -class MultiTriggerMockRuntime: +class SiblingTriggerMockRuntime: """Mock runtime that simulates parallel branching with multiple interrupts.""" def __init__(self) -> None: @@ -89,6 +94,56 @@ async def get_schema(self) -> UiPathRuntimeSchema: raise NotImplementedError() +class MultiTriggerMockRuntime: + """Mock runtime that suspends on a multi-trigger interrupt, then suspends again.""" + + def __init__(self) -> None: + self.execution_count = 0 + + async def dispose(self) -> None: + pass + + async def execute( + self, + input: dict[str, Any] | None = None, + options: UiPathExecuteOptions | None = None, + ) -> UiPathRuntimeResult: + self.execution_count += 1 + is_resume = options and options.resume + + if self.execution_count == 1: + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + output={"child-1": {"process": "first-child"}}, + ) + + assert is_resume + assert input == { + "child-1": { + "completed": True, + "__uipath": { + "triggerType": UiPathResumeTriggerType.JOB.value, + "triggerName": "Unknown", + }, + } + } + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + output={"child-2": {"process": "second-child"}}, + ) + + async def stream( + self, + input: dict[str, Any] | None = None, + options: UiPathStreamOptions | None = None, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + result = await self.execute(input, options) + yield result + + async def get_schema(self) -> UiPathRuntimeSchema: + raise NotImplementedError() + + class StatefulStorageMock: """Stateful storage mock that tracks triggers.""" @@ -106,8 +161,14 @@ async def save_triggers( async def delete_trigger( self, runtime_id: str, trigger: UiPathResumeTrigger ) -> None: + await self.delete_triggers(runtime_id, [trigger]) + + async def delete_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + interrupt_ids = {trigger.interrupt_id for trigger in triggers} self.triggers = [ - t for t in self.triggers if t.interrupt_id != trigger.interrupt_id + t for t in self.triggers if t.interrupt_id not in interrupt_ids ] async def set_value( @@ -119,6 +180,80 @@ async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: return None +class DeprecatedDeleteAliasStorage(UiPathResumableStorageProtocol): + """Storage that relies on the protocol's deprecated singular delete alias.""" + + def __init__(self) -> None: + self.deleted_triggers: list[UiPathResumeTrigger] = [] + + async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger]: + return [] + + async def save_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + pass + + async def delete_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + self.deleted_triggers.extend(triggers) + + async def set_value( + self, runtime_id: str, namespace: str, key: str, value: Any + ) -> None: + pass + + async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: + return None + + +class DefaultBulkDeleteStorage(UiPathResumableStorageProtocol): + """Storage that relies on the protocol's plural delete default.""" + + def __init__(self) -> None: + self.deleted_triggers: list[UiPathResumeTrigger] = [] + + async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger]: + return [] + + async def save_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + pass + + async def delete_trigger( + self, runtime_id: str, trigger: UiPathResumeTrigger + ) -> None: + self.deleted_triggers.append(trigger) + + async def set_value( + self, runtime_id: str, namespace: str, key: str, value: Any + ) -> None: + pass + + async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: + return None + + +class DefaultCreateTriggersManager(UiPathResumeTriggerProtocol): + """Trigger manager that relies on the protocol's plural create default.""" + + def __init__(self) -> None: + self.created_values: list[Any] = [] + + async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: + self.created_values.append(suspend_value) + return UiPathResumeTrigger( + interrupt_id="", + trigger_type=UiPathResumeTriggerType.TASK, + payload=suspend_value, + ) + + async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None: + return None + + def make_trigger_manager_mock() -> UiPathResumeTriggerProtocol: """Create trigger manager mock.""" manager = Mock(spec=UiPathResumeTriggerProtocol) @@ -134,19 +269,227 @@ async def read_trigger_default(trigger: UiPathResumeTrigger) -> dict[str, Any]: raise UiPathPendingTriggerError(ErrorCategory.USER, "Trigger not fired yet") manager.create_trigger = AsyncMock(side_effect=create_trigger_impl) + + async def create_triggers_impl(data: dict[str, Any]) -> list[UiPathResumeTrigger]: + return [await manager.create_trigger(data)] + + manager.create_triggers = AsyncMock(side_effect=create_triggers_impl) manager.read_trigger = AsyncMock(side_effect=read_trigger_default) return cast(UiPathResumeTriggerProtocol, manager) class TestResumableRuntime: + @pytest.mark.asyncio + async def test_delete_triggers_default_delegates_to_singular_delete( + self, + ) -> None: + """Plural delete compatibility default delegates one trigger at a time.""" + + storage = DefaultBulkDeleteStorage() + triggers = [ + UiPathResumeTrigger( + interrupt_id="int-1", + trigger_type=UiPathResumeTriggerType.TASK, + ), + UiPathResumeTrigger( + interrupt_id="int-2", + trigger_type=UiPathResumeTriggerType.TASK, + ), + ] + + await storage.delete_triggers("runtime-1", triggers) + + assert storage.deleted_triggers == triggers + + @pytest.mark.asyncio + async def test_delete_trigger_alias_delegates_to_delete_triggers(self) -> None: + """Deprecated singular delete API delegates to the plural API.""" + + storage = DeprecatedDeleteAliasStorage() + trigger = UiPathResumeTrigger( + interrupt_id="int-1", + trigger_type=UiPathResumeTriggerType.TASK, + ) + + with pytest.warns( + DeprecationWarning, + match="delete_trigger\\(\\) is deprecated; use delete_triggers\\(\\) instead", + ): + await storage.delete_trigger("runtime-1", trigger) + + assert storage.deleted_triggers == [trigger] + + @pytest.mark.asyncio + async def test_create_triggers_default_delegates_to_create_trigger(self) -> None: + """Plural create compatibility default wraps a single created trigger.""" + + manager = DefaultCreateTriggersManager() + + triggers = await manager.create_triggers({"action": "approve"}) + + assert len(triggers) == 1 + assert triggers[0].payload == {"action": "approve"} + assert manager.created_values == [{"action": "approve"}] + + def test_with_trigger_metadata_merges_existing_uipath_metadata(self) -> None: + """Existing UiPath metadata is preserved when trigger metadata is added.""" + + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + resumable = UiPathResumableRuntime( + delegate=SiblingTriggerMockRuntime(), + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + trigger = UiPathResumeTrigger( + interrupt_id="int-1", + trigger_type=UiPathResumeTriggerType.TIMER, + trigger_name=UiPathResumeTriggerName.TIMER, + ) + + result = resumable._with_trigger_metadata( + trigger, + {"__uipath": {"kind": "timeout", "timeout": 10}, "value": "done"}, + ) + + assert result == { + "__uipath": { + "kind": "timeout", + "timeout": 10, + "triggerType": UiPathResumeTriggerType.TIMER.value, + "triggerName": UiPathResumeTriggerName.TIMER.value, + }, + "value": "done", + } + + def test_with_trigger_metadata_wraps_non_mapping_data(self) -> None: + """Non-mapping resume values are wrapped with trigger metadata.""" + + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + resumable = UiPathResumableRuntime( + delegate=SiblingTriggerMockRuntime(), + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + trigger = UiPathResumeTrigger( + interrupt_id="int-1", + trigger_type=UiPathResumeTriggerType.TASK, + trigger_name=UiPathResumeTriggerName.TASK, + ) + + result = resumable._with_trigger_metadata(trigger, "approved") + + assert result == { + "__uipath": { + "triggerType": UiPathResumeTriggerType.TASK.value, + "triggerName": UiPathResumeTriggerName.TASK.value, + }, + "value": "approved", + } + assert resumable._metadata_value("CustomName") == "CustomName" + + @pytest.mark.asyncio + async def test_restore_resume_input_deletes_single_trigger_for_explicit_input( + self, + ) -> None: + """Explicit resume input clears the only stored trigger.""" + + storage = StatefulStorageMock() + storage.triggers = [ + UiPathResumeTrigger( + interrupt_id="int-1", + trigger_type=UiPathResumeTriggerType.TASK, + ) + ] + resumable = UiPathResumableRuntime( + delegate=SiblingTriggerMockRuntime(), + storage=storage, + trigger_manager=make_trigger_manager_mock(), + runtime_id="runtime-1", + ) + + result = await resumable._restore_resume_input({"int-1": {"approved": True}}) + + assert result == {"int-1": {"approved": True}} + assert storage.triggers == [] + + @pytest.mark.asyncio + async def test_restore_resume_input_deletes_matching_trigger_for_explicit_input( + self, + ) -> None: + """Explicit resume input clears only matching stored triggers.""" + + trigger_1 = UiPathResumeTrigger( + interrupt_id="int-1", + trigger_type=UiPathResumeTriggerType.TASK, + ) + trigger_2 = UiPathResumeTrigger( + interrupt_id="int-2", + trigger_type=UiPathResumeTriggerType.TASK, + ) + storage = StatefulStorageMock() + storage.triggers = [trigger_1, trigger_2] + resumable = UiPathResumableRuntime( + delegate=SiblingTriggerMockRuntime(), + storage=storage, + trigger_manager=make_trigger_manager_mock(), + runtime_id="runtime-1", + ) + + result = await resumable._restore_resume_input({"int-2": {"approved": True}}) + + assert result == {"int-2": {"approved": True}} + assert storage.triggers == [trigger_1] + + @pytest.mark.asyncio + async def test_build_resume_map_skips_duplicate_pollable_trigger(self) -> None: + """Duplicate pollable triggers for one interrupt are read once.""" + + trigger_1 = UiPathResumeTrigger( + interrupt_id="int-1", + trigger_type=UiPathResumeTriggerType.TASK, + ) + trigger_2 = UiPathResumeTrigger( + interrupt_id="int-1", + trigger_type=UiPathResumeTriggerType.JOB, + ) + storage = StatefulStorageMock() + storage.triggers = [trigger_1, trigger_2] + trigger_manager = make_trigger_manager_mock() + read_trigger_mock = AsyncMock(return_value="approved") + cast(Any, trigger_manager).read_trigger = read_trigger_mock + resumable = UiPathResumableRuntime( + delegate=SiblingTriggerMockRuntime(), + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + result = await resumable._build_resume_map([trigger_1, trigger_2]) + + assert result == { + "int-1": { + "__uipath": { + "triggerType": UiPathResumeTriggerType.TASK.value, + "triggerName": "Unknown", + }, + "value": "approved", + } + } + read_trigger_mock.assert_awaited_once_with(trigger_1) + assert storage.triggers == [] + @pytest.mark.asyncio async def test_resumable_creates_multiple_triggers_on_first_suspension( self, ) -> None: """First suspension with parallel branches should create multiple triggers.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -179,7 +522,7 @@ async def test_resumable_creates_multiple_triggers_on_first_suspension( async def test_resumable_adds_only_new_triggers_on_partial_resume(self) -> None: """Partial resume should keep pending trigger and add only new ones.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -222,7 +565,7 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: async def test_resumable_completes_after_all_triggers_resolved(self) -> None: """After all triggers resolved, execution should complete successfully.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -269,7 +612,7 @@ async def read_trigger_impl_3(trigger: UiPathResumeTrigger) -> dict[str, Any]: async def test_resumable_auto_resumes_when_triggers_already_fired(self) -> None: """When triggers are already fired during suspension, runtime should auto-resume.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -308,7 +651,7 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: async def test_resumable_auto_resumes_partial_fired_triggers(self) -> None: """When only some triggers are fired during suspension, auto-resume with those.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -344,7 +687,7 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: async def test_resumable_auto_resumes_multiple_times(self) -> None: """When triggers keep being fired immediately, keep auto-resuming until complete.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -389,7 +732,7 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: async def test_resumable_stream_auto_resumes_when_triggers_fired(self) -> None: """Stream should auto-resume when triggers are already fired.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -426,7 +769,7 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: async def test_resumable_no_auto_resume_when_all_triggers_pending(self) -> None: """When all triggers are pending, should NOT auto-resume.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -457,7 +800,7 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: async def test_resumable_skips_api_triggers_on_auto_resume_check(self) -> None: """API triggers should be skipped when checking for auto-resume after suspension.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -508,7 +851,7 @@ async def test_resumable_skips_inbox_triggers_on_auto_resume_check(self) -> None 404 and fault the run. They should behave like API triggers here. """ - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -552,7 +895,7 @@ def create_inbox_trigger(data: dict[str, Any]) -> UiPathResumeTrigger: async def test_resumable_skips_timer_triggers_on_auto_resume_check(self) -> None: """Timer triggers should be skipped when checking for auto-resume.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -595,7 +938,7 @@ async def test_resumable_auto_resumes_task_triggers_but_not_api_triggers( ) -> None: """Mixed triggers: TASK triggers should trigger auto-resume, API triggers should not.""" - runtime_impl = MultiTriggerMockRuntime() + runtime_impl = SiblingTriggerMockRuntime() storage = StatefulStorageMock() trigger_manager = make_trigger_manager_mock() @@ -640,3 +983,67 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: # Delegate should have been executed twice (initial + auto-resume for TASK trigger) assert runtime_impl.execution_count == 2 + + @pytest.mark.asyncio + async def test_resumable_removes_sibling_triggers_when_one_trigger_fires( + self, + ) -> None: + """When one trigger fires, sibling triggers must not leak forward.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + async def create_sibling_triggers( + data: dict[str, Any], + ) -> list[UiPathResumeTrigger]: + return [ + UiPathResumeTrigger( + interrupt_id="", # Will be set by resumable runtime + trigger_type=UiPathResumeTriggerType.JOB, + payload={"source": data["process"]}, + ), + UiPathResumeTrigger( + interrupt_id="", # Will be set by resumable runtime + trigger_type=UiPathResumeTriggerType.TIMER, + payload={"source": data["process"]}, + ), + ] + + create_triggers_mock = AsyncMock(side_effect=create_sibling_triggers) + cast(Any, trigger_manager).create_triggers = create_triggers_mock + + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if ( + trigger.interrupt_id == "child-1" + and trigger.trigger_type == UiPathResumeTriggerType.JOB + ): + return {"completed": True} + raise UiPathPendingTriggerError(ErrorCategory.USER, "still pending") + + read_trigger_mock = AsyncMock(side_effect=read_trigger_impl) + cast(Any, trigger_manager).read_trigger = read_trigger_mock + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + result = await resumable.execute({}) + + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert {t.interrupt_id for t in result.triggers} == {"child-2"} + assert len(result.triggers) == 2 + assert {t.trigger_type for t in result.triggers} == { + UiPathResumeTriggerType.JOB, + UiPathResumeTriggerType.TIMER, + } + assert runtime_impl.execution_count == 2 + assert create_triggers_mock.await_count == 2 + assert read_trigger_mock.await_count == 2 + + saved_triggers = await storage.get_triggers("runtime-1") + assert {t.interrupt_id for t in saved_triggers} == {"child-2"} diff --git a/uv.lock b/uv.lock index 2aaded6..d68907e 100644 --- a/uv.lock +++ b/uv.lock @@ -998,21 +998,21 @@ wheels = [ [[package]] name = "uipath-core" -version = "0.5.26" +version = "0.5.27" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "opentelemetry-instrumentation" }, { name = "opentelemetry-sdk" }, { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ae/9d/f2fd705cbe404e53150ebee4a7208df158ea0e307ade455dc2a3ee16fd12/uipath_core-0.5.26.tar.gz", hash = "sha256:c34f1d7bc823e4a45b8e21ae590d74b6ae9e2caab839c855e376c5f38ffa3e29", size = 130421, upload-time = "2026-06-29T15:43:31.882Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4a/9b/ad6b339be46a81df45101624b6b2d6b7c869fc9a2399cfce3f3b0cae7fe0/uipath_core-0.5.27.tar.gz", hash = "sha256:129d9ea351de22dd555b27388258e15d3eb6e2af44691df4cfca05784b65e54b", size = 130488, upload-time = "2026-06-30T13:16:47.185Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b6/71/b0947b61fe5b45a8642774a02bffa23371c38556fe877b37c8313bbea5f7/uipath_core-0.5.26-py3-none-any.whl", hash = "sha256:ad81607910ccf567721e31c2de557e100a46228f7e1277cfe37d71f6d472b06a", size = 54787, upload-time = "2026-06-29T15:43:30.642Z" }, + { url = "https://files.pythonhosted.org/packages/02/73/600e7ffd400343e4644e227ce081f2c88bb13ee00f2e95762dec0af8125e/uipath_core-0.5.27-py3-none-any.whl", hash = "sha256:5c5ed56f13ca560d84ce49e8deda8b0ef1b0eb64d38155ee3affd5209f7c1afa", size = 54882, upload-time = "2026-06-30T13:16:45.74Z" }, ] [[package]] name = "uipath-runtime" -version = "0.11.5" +version = "0.12.0" source = { editable = "." } dependencies = [ { name = "uipath-core" }, @@ -1034,7 +1034,7 @@ dev = [ ] [package.metadata] -requires-dist = [{ name = "uipath-core", specifier = ">=0.5.26,<0.6.0" }] +requires-dist = [{ name = "uipath-core", specifier = ">=0.5.27,<0.6.0" }] [package.metadata.requires-dev] dev = [