diff --git a/packages/uipath-llamaindex/pyproject.toml b/packages/uipath-llamaindex/pyproject.toml index edcaff6c..36cf900a 100644 --- a/packages/uipath-llamaindex/pyproject.toml +++ b/packages/uipath-llamaindex/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "llama-index-llms-azure-openai>=0.4.2", "openinference-instrumentation-llama-index>=4.3.9", "uipath>=2.10.0, <2.11.0", + "uipath-core>=0.5.18, <0.7.0", "uipath-runtime>=0.11.0, <0.12.0", ] classifiers = [ diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/governance/__init__.py b/packages/uipath-llamaindex/src/uipath_llamaindex/governance/__init__.py new file mode 100644 index 00000000..914e8096 --- /dev/null +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/governance/__init__.py @@ -0,0 +1,25 @@ +"""Governance integration for ``uipath-llamaindex``. + +Exposes :func:`install_governance` — registers a :class:`GovernanceEventHandler` +on the LlamaIndex root instrumentation dispatcher, which governs LLM/tool events +(BEFORE_MODEL, AFTER_MODEL, TOOL_CALL). Wired into a run by passing an +``evaluator`` to :class:`UiPathLlamaIndexRuntimeFactory`; the factory calls +:func:`install_governance`. + +Importing this module has no side effects: no adapter is registered, no global +state is mutated. +""" + +from __future__ import annotations + +from .event_handler import ( + GovernanceEventHandler, + install_governance, + uninstall_governance, +) + +__all__ = [ + "GovernanceEventHandler", + "install_governance", + "uninstall_governance", +] diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/governance/event_handler.py b/packages/uipath-llamaindex/src/uipath_llamaindex/governance/event_handler.py new file mode 100644 index 00000000..1b82fd5a --- /dev/null +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/governance/event_handler.py @@ -0,0 +1,352 @@ +"""LlamaIndex governance event handler for UiPath. + +Provides governance for LlamaIndex agents/workflows. Unlike the ADK / OpenAI / +Agent-Framework integrations — which install per-agent callbacks or middleware — +LlamaIndex routes everything (LLM calls, tool calls) through its global +**instrumentation dispatcher** (the same mechanism the package already uses for +OpenInference tracing). So this adapter governs by registering a +:class:`GovernanceEventHandler` on the **root dispatcher**, which receives every +event propagated from child dispatchers: + +- ``LLMChatStartEvent`` → BEFORE_MODEL (scans the latest input message) +- ``LLMChatEndEvent`` → AFTER_MODEL (scans the response) +- ``AgentToolCallEvent`` → TOOL_CALL (tool name + arguments) + +The dispatcher is process-global, so registration is process-wide — which fits +the coded-agent model (one workflow per process). :func:`install_governance` +therefore returns the ``agent`` unchanged (nothing is mutated on it); the wiring +lives on the dispatcher. A second install (a reused process serving a new +runtime) **rebinds** that one handler to the new run's evaluator / session +rather than silently ignoring it — the most-recent install governs. +:func:`uninstall_governance` removes the handler so the global dispatcher does +not retain the evaluator after the runtime is gone; the factory calls it on +dispose. + +Because the dispatcher is process-global and LlamaIndex events do not carry a +stable per-run identity, this adapter does not isolate two *concurrently* +executing runtimes in the same process — they would share the latest-installed +evaluator. That is a property of LlamaIndex's global instrumentation and matches +the one-workflow-per-process runtime model. + +LlamaIndex does **not** emit a tool-*end* instrumentation event, so AFTER_TOOL +is not wired here; a tool's result is instead governed at the next +``LLMChatStartEvent`` where it is fed back to the model as input. This holds +only when the tool result is the **latest** message in that request (the usual +case — BEFORE_MODEL scans the latest message, see +:func:`_latest_message_text`); if the framework injects later messages before +the next model call, an intervening tool result is not separately scanned. +This is the LlamaIndex analogue of the OpenAI adapter's missing tool-args. + +Chain-level boundaries (BEFORE_AGENT / AFTER_AGENT) are owned by the +governance host and are intentionally not fired here. + +The evaluator protocol comes from ``uipath-core``; this package contributes +only the LlamaIndex-specific wiring. Governance is installed by the runtime +factory: passing an ``evaluator`` to ``new_runtime`` calls +:func:`install_governance`, which registers the handler on the dispatcher. No +adapter registry, no entry point, no import-time side effects. + +Audit emission and enforcement (raising :class:`GovernanceBlockException` on +DENY) are owned by the evaluator. The handler only extracts payloads and calls +the matching ``evaluate_*`` method; :class:`GovernanceBlockException` propagates +(aborting the run), anything else is logged and swallowed. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, List + +from llama_index.core.instrumentation import ( # type: ignore[attr-defined] + get_dispatcher, +) +from llama_index.core.instrumentation.event_handlers.base import ( # type: ignore[attr-defined] + BaseEventHandler, +) +from llama_index.core.instrumentation.events.agent import AgentToolCallEvent +from llama_index.core.instrumentation.events.llm import ( + LLMChatEndEvent, + LLMChatStartEvent, +) +from pydantic import PrivateAttr +from uipath.core.adapters import EvaluatorProtocol +from uipath.core.governance.exceptions import GovernanceBlockException + +logger = logging.getLogger(__name__) + +# Cap on the text blob passed to BEFORE_MODEL / AFTER_MODEL governance +# evaluation. Sized to match the runtime side and the other adapters. +_BEFORE_MODEL_TEXT_CAP = 64000 + + +def install_governance( + agent: Any, + evaluator: EvaluatorProtocol, + *, + agent_name: str, + session_id: str, +) -> Any: + """Register the governance event handler on the root dispatcher. + + Returns the ``agent`` unchanged — LlamaIndex governance is wired on the + process-global instrumentation dispatcher, not on the agent object. If a + governance handler is already registered (a reused process serving a new + runtime), it is **rebound** to this run's evaluator / session instead of + being left pointing at the previous run. + + Called by :class:`UiPathLlamaIndexRuntimeFactory` when an ``evaluator`` + is supplied to ``new_runtime``. + """ + dispatcher = get_dispatcher() + for handler in dispatcher.event_handlers: + if isinstance(handler, GovernanceEventHandler): + handler.rebind( + evaluator=evaluator, agent_name=agent_name, session_id=session_id + ) + logger.debug("Rebound existing governance handler to the new runtime") + return agent + callbacks = GovernanceCallbacks( + evaluator=evaluator, agent_name=agent_name, session_id=session_id + ) + dispatcher.add_event_handler(GovernanceEventHandler(callbacks=callbacks)) + logger.debug("Registered governance event handler on LlamaIndex dispatcher") + return agent + + +def uninstall_governance(agent: Any = None) -> Any: + """Remove the governance handler(s) from the root dispatcher. + + The instrumentation dispatcher is process-global, so a registered handler + (and the evaluator it holds) would otherwise outlive the runtime. The + factory calls this on ``dispose`` to release it. Returns ``agent`` unchanged. + Safe to call when nothing is registered. + """ + dispatcher = get_dispatcher() + handlers = dispatcher.event_handlers + remaining = [h for h in handlers if not isinstance(h, GovernanceEventHandler)] + if len(remaining) != len(handlers): + # event_handlers is a plain list; mutate in place to avoid a pydantic + # attribute re-assignment on the Dispatcher model. + handlers[:] = remaining + logger.debug("Removed governance event handler from LlamaIndex dispatcher") + return agent + + +class GovernanceEventHandler(BaseEventHandler): + """Routes LlamaIndex instrumentation events to a governance evaluator. + + A pydantic model (``BaseEventHandler`` is one), so the evaluator + state + are held in a private attribute. ``handle`` is called synchronously by the + dispatcher for every event; we dispatch the three governance-relevant + types and ignore the rest. + """ + + _callbacks: "GovernanceCallbacks" = PrivateAttr() + + def __init__(self, callbacks: "GovernanceCallbacks", **data: Any) -> None: + super().__init__(**data) + self._callbacks = callbacks + + @classmethod + def class_name(cls) -> str: + return "GovernanceEventHandler" + + def rebind( + self, + evaluator: EvaluatorProtocol, + agent_name: str, + session_id: str, + ) -> None: + """Re-point the single process-global handler at a new runtime.""" + self._callbacks.rebind( + evaluator=evaluator, agent_name=agent_name, session_id=session_id + ) + + def handle(self, event: Any, **kwargs: Any) -> Any: + # The dispatcher calls ``handle`` synchronously and inline with the + # instrumented call. That is deliberate: a BEFORE_MODEL / TOOL_CALL + # governance decision must complete (and be able to BLOCK) *before* the + # underlying LLM / tool call proceeds — an async, out-of-band check + # could not gate it. The evaluator is expected to be fast. + if isinstance(event, LLMChatStartEvent): + self._callbacks.before_model(event.messages) + elif isinstance(event, LLMChatEndEvent): + self._callbacks.after_model(event.response) + elif isinstance(event, AgentToolCallEvent): + self._callbacks.tool_call(event.tool, event.arguments) + return None + + +class GovernanceCallbacks: + """Holds the evaluator + per-attach state, called by the event handler. + + :class:`GovernanceBlockException` is re-raised (it aborts the run); + anything else is logged and swallowed so a governance bug never breaks an + agent run. + """ + + def __init__( + self, + evaluator: EvaluatorProtocol, + agent_name: str, + session_id: str, + ) -> None: + self._evaluator = evaluator + self._agent_name = agent_name + self._session_id = session_id + # ``trace_id`` is intentionally NOT held here. A single uuid minted at + # install time would be identical for every call. Trace correlation is + # owned by the layer below (OTel span / HTTP resolve at call time), + # matching the LangChain adapter. + self._session_state: Dict[str, Any] = {"tool_calls": 0, "llm_calls": 0} + + def rebind( + self, + evaluator: EvaluatorProtocol, + agent_name: str, + session_id: str, + ) -> None: + """Re-point this callback set at a new run. + + Called when the process-global handler is reused for a fresh runtime — + updates the evaluator and identifiers and resets the per-run counters so + state does not bleed across runtimes. + """ + self._evaluator = evaluator + self._agent_name = agent_name + self._session_id = session_id + self._session_state = {"tool_calls": 0, "llm_calls": 0} + + def before_model(self, messages: Any) -> None: + """Evaluate BEFORE_MODEL on the latest input message (see ADK rationale).""" + try: + self._evaluator.evaluate_before_model( + model_input=_latest_message_text(messages), + agent_name=self._agent_name, + runtime_id=self._session_id, + ) + # Count only calls that passed governance — a DENY raises above, so + # a blocked call must not inflate the counter. + self._session_state["llm_calls"] = ( + self._session_state.get("llm_calls", 0) + 1 + ) + except GovernanceBlockException: + raise + except Exception as e: # noqa: BLE001 - governance must not break the run + logger.warning("before_model governance check failed (continuing): %s", e) + + def after_model(self, response: Any) -> None: + """Evaluate AFTER_MODEL on the chat response text.""" + try: + self._evaluator.evaluate_after_model( + model_output=_response_text(response), + agent_name=self._agent_name, + runtime_id=self._session_id, + ) + except GovernanceBlockException: + raise + except Exception as e: # noqa: BLE001 + logger.warning("after_model governance check failed (continuing): %s", e) + + def tool_call(self, tool: Any, arguments: Any) -> None: + """Evaluate TOOL_CALL with the tool name + arguments.""" + try: + self._evaluator.evaluate_tool_call( + tool_name=getattr(tool, "name", None) or "unknown", + tool_args=_coerce_args(arguments), + agent_name=self._agent_name, + runtime_id=self._session_id, + session_state=self._session_state, + ) + # Count only calls that passed governance; the evaluator saw the + # count of prior tool calls, and a DENY raises before this bump. + self._session_state["tool_calls"] = ( + self._session_state.get("tool_calls", 0) + 1 + ) + except GovernanceBlockException: + raise + except Exception as e: # noqa: BLE001 + logger.warning("tool_call governance check failed (continuing): %s", e) + + +# -------------------------------------------------------------------------- +# Text / argument extraction +# -------------------------------------------------------------------------- + + +def _latest_message_text(messages: Any) -> str: + """Text of the most-recent message in a chat request.""" + if not messages: + return "" + if isinstance(messages, (list, tuple)): + return _message_text(messages[-1]) + return _message_text(messages) + + +def _message_text(message: Any) -> str: + """Pull text from a ``ChatMessage`` (``.content`` / ``.blocks``) or a str.""" + if message is None: + return "" + if isinstance(message, str): + return message[:_BEFORE_MODEL_TEXT_CAP] + content = getattr(message, "content", None) + if isinstance(content, str) and content: + return content[:_BEFORE_MODEL_TEXT_CAP] + # Multimodal ChatMessage carries typed blocks. Walk them for text (a + # TextBlock exposes ``.text``) rather than ``str(message)``, which would + # serialize the pydantic repr — dict-syntax noise that pollutes the + # regex-scanned blob. Non-text blocks (image/binary) have no scannable text. + blocks = getattr(message, "blocks", None) + if isinstance(blocks, (list, tuple)): + texts = [ + t for b in blocks if isinstance((t := getattr(b, "text", None)), str) and t + ] + if texts: + return "\n".join(texts)[:_BEFORE_MODEL_TEXT_CAP] + return str(message)[:_BEFORE_MODEL_TEXT_CAP] + + +def _response_text(response: Any) -> str: + """Pull assistant text from a ``ChatResponse`` (``.message.content``).""" + if response is None: + return "" + message = getattr(response, "message", None) + if message is not None: + return _message_text(message) + text = getattr(response, "text", None) + if isinstance(text, str): + return text[:_BEFORE_MODEL_TEXT_CAP] + return str(response)[:_BEFORE_MODEL_TEXT_CAP] + + +def _coerce_args(arguments: Any) -> Dict[str, Any]: + """Normalise tool arguments (JSON string / Mapping / list / None) to a dict. + + ``AgentToolCallEvent.arguments`` is usually a JSON-encoded string; other + call sites may hand a dict directly. Non-dict payloads are preserved (not + dropped) so an arg-based policy can still scan them: a list-shaped arg + (common with MCP tools) is wrapped under ``_``, and malformed JSON is kept + raw under ``_raw`` — a payload governance can't parse must not be a way to + slip past it. + """ + if arguments is None: + return {} + if isinstance(arguments, dict): + return arguments + if isinstance(arguments, str): + try: + parsed = json.loads(arguments) + return parsed if isinstance(parsed, dict) else {"_": parsed} + except (TypeError, ValueError): + return {"_raw": arguments} + # list / tuple / other structured args — preserve rather than drop to {}. + return {"_": arguments} + + +__all__: List[str] = [ + "GovernanceCallbacks", + "GovernanceEventHandler", + "install_governance", + "uninstall_governance", +] diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/factory.py b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/factory.py index d9535faf..db22f73f 100644 --- a/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/factory.py +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/runtime/factory.py @@ -7,6 +7,7 @@ LlamaIndexInstrumentor, get_current_span, ) +from uipath.core.adapters import EvaluatorProtocol from uipath.core.tracing import UiPathSpanUtils, UiPathTraceManager from uipath.platform.resume_triggers import UiPathResumeTriggerHandler from uipath.runtime import ( @@ -19,6 +20,7 @@ from uipath.runtime.errors import UiPathErrorCategory from workflows import Workflow +from uipath_llamaindex.governance import install_governance, uninstall_governance from uipath_llamaindex.runtime._telemetry import ( ToolCallAttributeNormalizer, ) @@ -233,6 +235,7 @@ async def _create_runtime_instance( workflow: Workflow, runtime_id: str, entrypoint: str, + evaluator: EvaluatorProtocol | None = None, ) -> UiPathRuntimeProtocol: """ Create a runtime instance from a workflow. @@ -241,10 +244,19 @@ async def _create_runtime_instance( workflow: The workflow runtime_id: Unique identifier for the runtime instance entrypoint: Workflow entrypoint name + evaluator: When supplied, governance is installed on the + instrumentation dispatcher via :func:`install_governance`. Returns: Configured runtime instance """ + if evaluator is not None: + install_governance( + workflow, + evaluator, + agent_name=entrypoint, + session_id=runtime_id, + ) storage = await self._get_storage() @@ -274,6 +286,9 @@ async def new_runtime( Args: entrypoint: Workflow name from llama_index.json runtime_id: Unique identifier for the runtime instance + **kwargs: Forwarded factory kwargs. Recognized: ``evaluator`` + (``EvaluatorProtocol``) — when present, governance is installed + on the dispatcher via :func:`install_governance`. Returns: Configured runtime instance with workflow @@ -284,10 +299,16 @@ async def new_runtime( workflow=workflow, runtime_id=runtime_id, entrypoint=entrypoint, + evaluator=kwargs.get("evaluator"), ) async def dispose(self) -> None: """Cleanup factory resources.""" + # The governance handler lives on the process-global instrumentation + # dispatcher; remove it so the evaluator (and its resources) are not + # retained after the runtime is gone. + uninstall_governance() + for loader in self._workflow_loaders.values(): await loader.cleanup() diff --git a/packages/uipath-llamaindex/tests/governance/__init__.py b/packages/uipath-llamaindex/tests/governance/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/packages/uipath-llamaindex/tests/governance/test_event_handler.py b/packages/uipath-llamaindex/tests/governance/test_event_handler.py new file mode 100644 index 00000000..e607d358 --- /dev/null +++ b/packages/uipath-llamaindex/tests/governance/test_event_handler.py @@ -0,0 +1,423 @@ +"""Unit tests for the LlamaIndex governance event handler. + +The adapter governs via the LlamaIndex instrumentation dispatcher, so these +tests exercise the real event types (``LLMChatStartEvent`` etc.) routed +through :class:`GovernanceEventHandler`, plus the adapter's register/detach on +the dispatcher. The dispatcher is process-global, so each dispatcher test +cleans up after itself via ``detach``. +""" + +from __future__ import annotations + +import logging +from types import SimpleNamespace +from typing import Any, List + +import pytest +from llama_index.core.base.llms.types import ChatMessage, ChatResponse +from llama_index.core.instrumentation import ( # type: ignore[attr-defined] + get_dispatcher, +) +from llama_index.core.instrumentation.events.agent import AgentToolCallEvent +from llama_index.core.instrumentation.events.llm import ( + LLMChatEndEvent, + LLMChatStartEvent, +) +from llama_index.core.tools.types import ToolMetadata +from uipath.core.governance.exceptions import GovernanceBlockException + +from uipath_llamaindex.governance.event_handler import ( + _BEFORE_MODEL_TEXT_CAP, + GovernanceCallbacks, + GovernanceEventHandler, + _coerce_args, + install_governance, + uninstall_governance, +) + +# -------------------------------------------------------------------------- +# Fakes +# -------------------------------------------------------------------------- + + +class FakeEvaluator: + """Records evaluate_* calls; optionally BLOCKs on a named hook.""" + + def __init__(self, block_on: str | None = None) -> None: + self.block_on = block_on + self.calls: List[tuple[str, dict[str, Any]]] = [] + + def _record(self, hook: str, **kwargs: Any) -> None: + self.calls.append((hook, kwargs)) + if self.block_on == hook: + raise GovernanceBlockException("blocked") + + def evaluate_before_agent(self, *args: Any, **kwargs: Any) -> Any: + self._record("before_agent", **kwargs) + + def evaluate_after_agent(self, *args: Any, **kwargs: Any) -> Any: + self._record("after_agent", **kwargs) + + def evaluate_before_model(self, *args: Any, **kwargs: Any) -> Any: + self._record("before_model", **kwargs) + + def evaluate_after_model(self, *args: Any, **kwargs: Any) -> Any: + self._record("after_model", **kwargs) + + def evaluate_tool_call(self, *args: Any, **kwargs: Any) -> Any: + self._record("tool_call", **kwargs) + + def evaluate_after_tool(self, *args: Any, **kwargs: Any) -> Any: + self._record("after_tool", **kwargs) + + +class FakeWorkflow: + """Duck-typed LlamaIndex workflow stand-in.""" + + async def run(self, *_a: Any, **_k: Any) -> None: + return None + + +def _make_callbacks(ev: FakeEvaluator) -> GovernanceCallbacks: + return GovernanceCallbacks(evaluator=ev, agent_name="agent-1", session_id="sess-1") + + +def _handler(ev: FakeEvaluator) -> GovernanceEventHandler: + return GovernanceEventHandler(callbacks=_make_callbacks(ev)) + + +# -------------------------------------------------------------------------- +# install_governance (real dispatcher) +# -------------------------------------------------------------------------- + + +def _gov_handlers() -> list[Any]: + return [ + h + for h in get_dispatcher().event_handlers + if isinstance(h, GovernanceEventHandler) + ] + + +def _clear_gov_handlers() -> None: + # Use the adapter's own public detach rather than mutating the dispatcher. + uninstall_governance() + + +def test_install_governance_registers_handler(): + agent = FakeWorkflow() + try: + returned = install_governance( + agent, FakeEvaluator(), agent_name="x", session_id="s" + ) + assert returned is agent + assert len(_gov_handlers()) == 1 + finally: + _clear_gov_handlers() + assert _gov_handlers() == [] + + +def test_install_governance_reinstall_rebinds_single_handler(): + """The dispatcher is process-global: a second install keeps one handler but + rebinds it to the new run's evaluator / session (last install wins).""" + try: + install_governance( + FakeWorkflow(), FakeEvaluator(), agent_name="a", session_id="s1" + ) + handlers = _gov_handlers() + assert len(handlers) == 1 + gov = handlers[0] + assert gov._callbacks._session_id == "s1" + + ev2 = FakeEvaluator() + install_governance(FakeWorkflow(), ev2, agent_name="b", session_id="s2") + handlers = _gov_handlers() + assert len(handlers) == 1 # not stacked + assert handlers[0] is gov # same handler, rebound + assert gov._callbacks._session_id == "s2" + assert gov._callbacks._evaluator is ev2 + finally: + _clear_gov_handlers() + + +def test_uninstall_governance_removes_handler(): + install_governance(FakeWorkflow(), FakeEvaluator(), agent_name="x", session_id="s") + assert len(_gov_handlers()) == 1 + uninstall_governance() + assert _gov_handlers() == [] + # safe to call again when nothing is registered + uninstall_governance() + assert _gov_handlers() == [] + + +# -------------------------------------------------------------------------- +# Factory wiring — the evaluator kwarg drives install_governance +# -------------------------------------------------------------------------- + + +def _factory_without_init(): + """A factory instance that skips __init__ (avoids config/IO).""" + from uipath_llamaindex.runtime.factory import UiPathLlamaIndexRuntimeFactory + + f = UiPathLlamaIndexRuntimeFactory.__new__(UiPathLlamaIndexRuntimeFactory) + f.context = SimpleNamespace(command="run") # type: ignore[assignment] # read for debug_mode + return f + + +def _stub_factory_runtime(monkeypatch, factory_mod): + """Stub the runtime constructions + storage so only the governance branch runs.""" + monkeypatch.setattr( + factory_mod, "UiPathLlamaIndexRuntime", lambda **kw: SimpleNamespace(**kw) + ) + monkeypatch.setattr( + factory_mod, "UiPathResumableRuntime", lambda **kw: SimpleNamespace(**kw) + ) + monkeypatch.setattr(factory_mod, "UiPathResumeTriggerHandler", lambda *a, **k: None) + + async def _no_storage(self): + return None + + monkeypatch.setattr( + factory_mod.UiPathLlamaIndexRuntimeFactory, "_get_storage", _no_storage + ) + + +async def test_factory_installs_governance_when_evaluator_supplied(monkeypatch): + from uipath_llamaindex.runtime import factory as factory_mod + + _stub_factory_runtime(monkeypatch, factory_mod) + try: + await _factory_without_init()._create_runtime_instance( + workflow=FakeWorkflow(), + runtime_id="r", + entrypoint="e", + evaluator=FakeEvaluator(), + ) + assert len(_gov_handlers()) == 1 + finally: + _clear_gov_handlers() + + +async def test_factory_skips_governance_without_evaluator(monkeypatch): + from uipath_llamaindex.runtime import factory as factory_mod + + _stub_factory_runtime(monkeypatch, factory_mod) + await _factory_without_init()._create_runtime_instance( + workflow=FakeWorkflow(), runtime_id="r", entrypoint="e" + ) + assert _gov_handlers() == [] + + +# -------------------------------------------------------------------------- +# event routing through the handler +# -------------------------------------------------------------------------- + + +def test_handler_routes_llm_chat_start_to_before_model(): + ev = FakeEvaluator() + h = _handler(ev) + event = LLMChatStartEvent( + messages=[ + ChatMessage(role="user", content="old"), + ChatMessage(role="user", content="the question"), + ], + additional_kwargs={}, + model_dict={}, + ) + h.handle(event) + hook, kwargs = ev.calls[-1] + assert hook == "before_model" + assert kwargs["model_input"] == "the question" # latest only + + +def test_handler_routes_llm_chat_end_to_after_model(): + ev = FakeEvaluator() + h = _handler(ev) + event = LLMChatEndEvent( + messages=[ChatMessage(role="user", content="q")], + response=ChatResponse( + message=ChatMessage(role="assistant", content="the answer") + ), + ) + h.handle(event) + hook, kwargs = ev.calls[-1] + assert hook == "after_model" + assert kwargs["model_output"] == "the answer" + + +def test_handler_routes_tool_call(): + ev = FakeEvaluator() + h = _handler(ev) + event = AgentToolCallEvent( + tool=ToolMetadata(description="d", name="transfer"), + arguments='{"amount": 50}', + ) + h.handle(event) + hook, kwargs = ev.calls[-1] + assert hook == "tool_call" + assert kwargs["tool_name"] == "transfer" + assert kwargs["tool_args"] == {"amount": 50} + assert kwargs["session_state"]["tool_calls"] == 1 + + +def test_handler_ignores_unrelated_events(): + ev = FakeEvaluator() + h = _handler(ev) + h.handle(object()) # not a governance-relevant event + assert ev.calls == [] + + +# -------------------------------------------------------------------------- +# text / arg extraction +# -------------------------------------------------------------------------- + + +def test_before_model_caps_text(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + huge = "x" * (_BEFORE_MODEL_TEXT_CAP + 5000) + cb.before_model([ChatMessage(role="user", content=huge)]) + assert len(ev.calls[-1][1]["model_input"]) <= _BEFORE_MODEL_TEXT_CAP + + +def test_before_model_empty(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + cb.before_model([]) + assert ev.calls[-1][1]["model_input"] == "" + + +def test_coerce_args_json_string(): + assert _coerce_args('{"a": 1}') == {"a": 1} + + +def test_coerce_args_dict_passthrough(): + assert _coerce_args({"a": 1}) == {"a": 1} + + +def test_coerce_args_none_and_bad(): + assert _coerce_args(None) == {} + # malformed JSON is preserved raw (not dropped) so policies can still scan it + assert _coerce_args("not json") == {"_raw": "not json"} + + +def test_coerce_args_preserves_list_shaped_args(): + # list-shaped tool args (common with MCP tools) must not be dropped to {} + assert _coerce_args(["a", "b"]) == {"_": ["a", "b"]} + assert _coerce_args('["a", "b"]') == {"_": ["a", "b"]} + + +def test_message_text_walks_blocks_when_content_empty(): + # a multimodal message whose .content is empty falls back to its text + # blocks, not str(message) (which would serialize a pydantic repr) + from uipath_llamaindex.governance.event_handler import _message_text + + msg = SimpleNamespace( + content=None, + blocks=[SimpleNamespace(text="block one"), SimpleNamespace(text="block two")], + ) + assert _message_text(msg) == "block one\nblock two" + + +# -------------------------------------------------------------------------- +# enforcement semantics +# -------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "hook,invoke", + [ + ( + "before_model", + lambda cb: cb.before_model([ChatMessage(role="user", content="hi")]), + ), + ( + "after_model", + lambda cb: cb.after_model( + ChatResponse(message=ChatMessage(role="assistant", content="o")) + ), + ), + ( + "tool_call", + lambda cb: cb.tool_call(ToolMetadata(description="d", name="t"), "{}"), + ), + ], +) +def test_block_exception_propagates(hook, invoke): + cb = _make_callbacks(FakeEvaluator(block_on=hook)) + with pytest.raises(GovernanceBlockException): + invoke(cb) + + +def test_non_block_exception_is_swallowed(caplog): + class Boom: + def evaluate_before_model(self, **_: Any) -> None: + raise RuntimeError("evaluator bug") + + cb = GovernanceCallbacks(evaluator=Boom(), agent_name="a", session_id="s") # type: ignore[arg-type] + # Attach caplog's handler directly to the module logger: other suites in the + # full run can configure an ancestor ``uipath*`` logger with + # propagate=False, which breaks caplog's default root-handler capture. + logger = logging.getLogger("uipath_llamaindex.governance.event_handler") + logger.addHandler(caplog.handler) + prev = logger.level + logger.setLevel(logging.WARNING) + try: + cb.before_model([ChatMessage(role="user", content="x")]) + finally: + logger.removeHandler(caplog.handler) + logger.setLevel(prev) + assert any("governance check failed" in r.message for r in caplog.records) + + +# -------------------------------------------------------------------------- +# coverage: swallow on after_model/tool_call + extraction edges +# -------------------------------------------------------------------------- + + +class _Boom: + """Evaluator whose every evaluate_* raises a non-block error.""" + + def __getattr__(self, _name: str) -> Any: + def _raise(*_a: Any, **_k: Any) -> None: + raise RuntimeError("evaluator bug") + + return _raise + + +def test_after_model_and_tool_call_swallow_non_block_errors(caplog): + cb = GovernanceCallbacks(evaluator=_Boom(), agent_name="a", session_id="s") + logger = logging.getLogger("uipath_llamaindex.governance.event_handler") + logger.addHandler(caplog.handler) + prev = logger.level + logger.setLevel(logging.WARNING) + try: + cb.after_model(SimpleNamespace(message=SimpleNamespace(content="x"))) + cb.tool_call(SimpleNamespace(name="t"), {}) + finally: + logger.removeHandler(caplog.handler) + logger.setLevel(prev) + assert sum("governance check failed" in r.message for r in caplog.records) >= 2 + + +def test_extraction_edges(): + from uipath_llamaindex.governance.event_handler import ( + _latest_message_text, + _message_text, + _response_text, + ) + + # _message_text: None / str / object with no content or blocks -> str() + assert _message_text(None) == "" + assert _message_text("plain") == "plain" + assert isinstance(_message_text(SimpleNamespace(content=None, blocks=None)), str) + # _latest_message_text: single (non-list) message + assert _latest_message_text(SimpleNamespace(content="solo")) == "solo" + # _response_text: None / .message / .text fallback / str() fallback + assert _response_text(None) == "" + assert ( + _response_text(SimpleNamespace(message=SimpleNamespace(content="viamsg"))) + == "viamsg" + ) + assert _response_text(SimpleNamespace(message=None, text="viatext")) == "viatext" + assert isinstance(_response_text(SimpleNamespace(message=None, text=None)), str) diff --git a/packages/uipath-llamaindex/uv.lock b/packages/uipath-llamaindex/uv.lock index f879a865..04634918 100644 --- a/packages/uipath-llamaindex/uv.lock +++ b/packages/uipath-llamaindex/uv.lock @@ -3492,16 +3492,16 @@ wheels = [ [[package]] name = "uipath-core" -version = "0.5.18" +version = "0.5.28" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "opentelemetry-instrumentation" }, { name = "opentelemetry-sdk" }, { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/14/b1/d4e555a1a2ccf298195a5f2968e538b0cea8592b3e03f43fc12b178d6c69/uipath_core-0.5.18.tar.gz", hash = "sha256:63ebe8bdb818ca30a4bc9ab0ea8171315680691429931282939359ce039401ab", size = 131988, upload-time = "2026-06-08T14:04:49.688Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4c/f9/8d2f1d98cbebbcf059cf4561f38f34ad4cd58423e4f15cad22bd297a2563/uipath_core-0.5.28.tar.gz", hash = "sha256:942987f6b612c64f93d612ad7b242276ed75f129fdd8f25bc71c24ec8887e388", size = 130578, upload-time = "2026-06-30T14:04:48.841Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/57/de/1a820b33f7bff4565d7649772bc54c88480ac7e70f707097f7da37d05157/uipath_core-0.5.18-py3-none-any.whl", hash = "sha256:351d6faeecfc6a0acea93182e01526f39c04a77e09fa0444be5f4fb580463f5a", size = 54572, upload-time = "2026-06-08T14:04:48.22Z" }, + { url = "https://files.pythonhosted.org/packages/e8/1e/385bb166232a57ebe938cc57ad2717f350bc922bb5d2ce31af84306b7569/uipath_core-0.5.28-py3-none-any.whl", hash = "sha256:b952a46a21710073cbc16d6d5684e9aa645c107f57a636b778cfb94aa81a1e48", size = 54980, upload-time = "2026-06-30T14:04:47.374Z" }, ] [[package]] @@ -3516,6 +3516,7 @@ dependencies = [ { name = "llama-index-workflows" }, { name = "openinference-instrumentation-llama-index" }, { name = "uipath" }, + { name = "uipath-core" }, { name = "uipath-runtime" }, ] @@ -3560,6 +3561,7 @@ requires-dist = [ { name = "llama-index-workflows", specifier = ">=2.18.0,<3.0.0" }, { name = "openinference-instrumentation-llama-index", specifier = ">=4.3.9" }, { name = "uipath", specifier = ">=2.10.0,<2.11.0" }, + { name = "uipath-core", specifier = ">=0.5.18,<0.7.0" }, { name = "uipath-runtime", specifier = ">=0.11.0,<0.12.0" }, ] provides-extras = ["bedrock", "vertex"]