diff --git a/src/uipath/runtime/governance/_audit/__init__.py b/src/uipath/runtime/governance/_audit/__init__.py new file mode 100644 index 0000000..ce109ef --- /dev/null +++ b/src/uipath/runtime/governance/_audit/__init__.py @@ -0,0 +1,12 @@ +"""Audit sink framework for governance events. + +Internal module. Provides a pluggable audit system that emits +governance events to one or more sinks. The built-in +:class:`TracesAuditSink` emits OpenTelemetry spans and is always +registered by every :class:`AuditManager` — it carries the governance +audit trail and cannot be disabled by application code. + +Callers import from the submodules directly (``_audit.base``, +``_audit.traces``, ``_audit.factory``). This package exposes no +aggregated symbols. +""" diff --git a/src/uipath/runtime/governance/_audit/base.py b/src/uipath/runtime/governance/_audit/base.py new file mode 100644 index 0000000..a8a47b2 --- /dev/null +++ b/src/uipath/runtime/governance/_audit/base.py @@ -0,0 +1,520 @@ +"""Base classes and models for the audit sink framework. + +This module provides the core abstractions for the governance audit system: +- AuditEvent: The data model for audit events +- EventType: Constants for common event types +- AuditSink: Abstract base class for sink implementations +- AuditManager: Central hub for routing events to sinks + +Sink dispatch is synchronous on the caller's thread. Sinks that need +async export (HTTP, batched I/O) own that concern internally — the +OTel traces sink rides on opentelemetry-sdk's BatchSpanProcessor, +which handles export off the caller's thread. +""" + +from __future__ import annotations + +import json +import logging +import threading +from abc import ABC, abstractmethod +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from typing import Any + +from uipath.core.governance import EnforcementMode + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Audit Event Model +# ============================================================================= + + +@dataclass +class AuditEvent: + """Generic audit event that can be sent to any sink. + + Trace correlation is intentionally absent from this dataclass. + Sinks that need a trace id resolve one at their own boundary: + OTel-backed sinks read the live span from the caller's + ``contextvars`` directly (sink dispatch runs synchronously on the + caller's thread, so ``trace.get_current_span()`` resolves to the + agent's live span), and HTTP sinks defer to their injected + provider, which resolves at HTTP-call time. + + Attributes: + event_type: Type of event (e.g., "rule_evaluation", "hook_summary") + timestamp: When the event occurred (auto-set if not provided) + agent_name: Name of the agent being governed + hook: Lifecycle hook where event occurred (optional) + data: Event-specific data dictionary + metadata: Additional metadata for filtering/routing + """ + + event_type: str + agent_name: str = "unknown" + hook: str = "" + data: dict[str, Any] = field(default_factory=dict) + metadata: dict[str, Any] = field(default_factory=dict) + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + result = asdict(self) + result["timestamp"] = self.timestamp.isoformat() + return result + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict()) + + +class EventType: + """Constants for common event types.""" + + RULE_EVALUATION = "rule_evaluation" + HOOK_START = "hook_start" + HOOK_END = "hook_end" + SESSION_START = "session_start" + SESSION_END = "session_end" + POLICY_VIOLATION = "policy_violation" + POLICY_ALLOW = "policy_allow" + PACKS_LOADED = "packs_loaded" + + +# ============================================================================= +# Audit Sink Base Class +# ============================================================================= + + +class AuditSink(ABC): + """Abstract base class for audit output destinations. + + Subclass this to create custom audit sinks. Each sink receives + all audit events and decides how to handle them. + + Sinks that perform network I/O should batch internally — :meth:`emit` + runs on the caller's thread (typically an agent hook), so a slow + synchronous sink blocks the agent. The standard pattern is the one + opentelemetry-sdk uses for its trace exporter: enqueue in-process, + drain on a sink-owned background thread. + + Example: + A Slack sink that posts on rule denials. ``emit`` enqueues onto + an in-process queue; a daemon thread the sink owns drains the + queue and runs the HTTP POST off the caller's thread. + + class SlackAuditSink(AuditSink): + def __init__(self, webhook_url: str): + self.webhook_url = webhook_url + self._name = "slack" + self._queue: queue.Queue[AuditEvent | None] = queue.Queue() + self._worker = threading.Thread( + target=self._drain, name="slack-audit", daemon=True + ) + self._worker.start() + + @property + def name(self) -> str: + return self._name + + def accepts(self, event: AuditEvent) -> bool: + # Only ship denials — drops irrelevant events at the + # boundary instead of forwarding them to the queue. + return ( + event.data.get("matched") + and event.data.get("action") == "deny" + ) + + def emit(self, event: AuditEvent) -> None: + # Non-blocking — runs on the caller's hook thread. + self._queue.put_nowait(event) + + def _drain(self) -> None: + while True: + event = self._queue.get() + if event is None: + return # close() sentinel + try: + requests.post(self.webhook_url, json=event.to_dict()) + except Exception: + pass # log/retry per sink's own policy + finally: + self._queue.task_done() + + def flush(self) -> None: + self._queue.join() + + def close(self) -> None: + self._queue.put_nowait(None) + self._worker.join(timeout=2.0) + """ + + @property + @abstractmethod + def name(self) -> str: + """Unique name for this sink.""" + pass + + @abstractmethod + def emit(self, event: AuditEvent) -> None: + """Emit an audit event to this sink. + + Args: + event: The audit event to emit + + Note: + Implementations should handle errors gracefully and not + raise exceptions that would disrupt governance evaluation. + """ + pass + + def flush(self) -> None: + """Flush any buffered events. + + Override if sink buffers events before writing. + """ + return + + def close(self) -> None: + """Clean up resources. + + Override if sink holds resources that need cleanup. + """ + return + + def accepts(self, event: AuditEvent) -> bool: + """Check if this sink should receive the event. + + Override to filter events. Default accepts all events. + + Args: + event: The audit event to check + + Returns: + True if sink should receive event, False to skip + """ + return True + + +# ============================================================================= +# Audit Manager +# ============================================================================= + + +class AuditManager: + """Manages multiple audit sinks and routes events to them. + + Instance-scoped: each :class:`GovernanceRuntime` owns its own + manager. Parallel runtimes (``uipath eval``) don't share sinks or + per-sink failure state. + + Constructor automatically registers the always-on ``traces`` sink, + which carries the governance audit trail and cannot be disabled by + application code. Additional sinks can be added via + :meth:`register_sink`. + + Thread Safety: + :meth:`emit` dispatches synchronously on the caller's thread. + Sinks that need to avoid blocking the caller (HTTP exporters) + own their own batching — the OTel traces sink, for example, + rides on opentelemetry-sdk's BatchSpanProcessor. + """ + + # Trip a sink after this many consecutive emit failures (circuit-breaker). + _SINK_FAILURE_THRESHOLD = 10 + + def __init__(self, register_default_sinks: bool = True) -> None: + """Initialize the audit manager. + + Args: + register_default_sinks: If True (default), register the + always-on ``traces`` sink. Tests that want a bare + manager can pass ``False`` and register sinks + explicitly. + """ + self._sinks: list[AuditSink] = [] + # Guards _sinks, _sink_failures, _tripped_sinks — all read + + # mutated by emit() across threads when concurrent agent hooks + # share one manager. + self._sinks_lock = threading.Lock() + # Per-sink consecutive-failure counter, keyed by sink name. + self._sink_failures: dict[str, int] = {} + self._tripped_sinks: set[str] = set() + + if register_default_sinks: + self._register_traces_sink() + + def _register_traces_sink(self) -> None: + """Register the always-on ``traces`` sink. + + Registered for every manager and cannot be disabled by + application code — it carries the governance audit trail. The + factory import is deferred to avoid a module-load cycle + (``factory`` imports back into this module). + """ + from .factory import create_sink + + sink = create_sink("traces") + if sink is not None: + self.register_sink(sink) + logger.info("Governance audit sink registered: traces") + + def register_sink(self, sink: AuditSink) -> None: + """Register an audit sink. + + Args: + sink: The sink to register + + Note: + Duplicate sinks (same name) are ignored. + The circuit-breaker failure counter is cleared so a freshly + registered sink doesn't inherit a previous instance's tripped + state. ``unregister_sink`` already clears these, but the + defensive reset here guards against external manipulation + of the internal counters (tests, future callers). + """ + with self._sinks_lock: + if any(s.name == sink.name for s in self._sinks): + logger.debug("Sink '%s' already registered, skipping", sink.name) + return + self._sinks.append(sink) + self._sink_failures.pop(sink.name, None) + self._tripped_sinks.discard(sink.name) + logger.info("Registered audit sink: %s", sink.name) + + def unregister_sink(self, name: str) -> bool: + """Unregister an audit sink by name. + + Args: + name: Name of the sink to remove + + Returns: + True if sink was removed, False if not found + """ + sink_to_close: AuditSink | None = None + with self._sinks_lock: + for i, sink in enumerate(self._sinks): + if sink.name == name: + sink_to_close = sink + del self._sinks[i] + self._sink_failures.pop(name, None) + self._tripped_sinks.discard(name) + break + if sink_to_close is not None: + try: + sink_to_close.close() + except Exception as e: + logger.warning("Audit sink '%s' failed to close: %s", name, e) + logger.info("Unregistered audit sink: %s", name) + return True + return False + + def get_sink(self, name: str) -> AuditSink | None: + """Get a registered sink by name.""" + with self._sinks_lock: + for sink in self._sinks: + if sink.name == name: + return sink + return None + + def list_sinks(self) -> list[str]: + """Get names of all registered sinks.""" + with self._sinks_lock: + return [s.name for s in self._sinks] + + def emit(self, event: AuditEvent) -> None: + """Dispatch ``event`` synchronously to every live sink. + + Per-sink errors are caught and folded into the circuit breaker + — a sink that fails too many times in a row is skipped for the + rest of the manager's lifetime. The caller never sees a sink + exception. + + Args: + event: The audit event to emit + """ + with self._sinks_lock: + sinks = list(self._sinks) + tripped = set(self._tripped_sinks) + for sink in sinks: + if sink.name in tripped: + continue + try: + if sink.accepts(event): + sink.emit(event) + # Success — reset failure counter for this sink. + with self._sinks_lock: + if self._sink_failures.get(sink.name): + self._sink_failures[sink.name] = 0 + except Exception as e: + with self._sinks_lock: + fails = self._sink_failures.get(sink.name, 0) + 1 + self._sink_failures[sink.name] = fails + tripped_now = fails >= self._SINK_FAILURE_THRESHOLD + if tripped_now: + self._tripped_sinks.add(sink.name) + if tripped_now: + logger.error( + "Audit sink '%s' tripped after %d consecutive failures; " + "will be skipped for the rest of this process. Last error: %s", + sink.name, + fails, + e, + ) + else: + logger.warning( + "Audit sink '%s' failed to emit event (%d/%d): %s", + sink.name, + fails, + self._SINK_FAILURE_THRESHOLD, + e, + ) + + def emit_rule_evaluation( + self, + policy_id: str, + rule_name: str, + pack_name: str, + hook: str, + matched: bool, + action: str, + enforcement_mode: EnforcementMode, + detail: str = "", + agent_name: str = "agent", + description: str = "", + ) -> None: + """Convenience method to emit a rule evaluation event. + + ``enforcement_mode`` travels on the event so sinks don't have to + read a process-global. Each emitter (instance-scoped) supplies + its own mode — parallel runtimes can run in different modes + simultaneously, and a process-global wouldn't be authoritative + for any of them. + """ + self.emit( + AuditEvent( + event_type=EventType.RULE_EVALUATION, + agent_name=agent_name, + hook=hook, + data={ + "policy_id": policy_id, + "rule_name": rule_name, + "pack_name": pack_name, + "matched": matched, + "action": action, + "enforcement_mode": enforcement_mode, + "detail": detail, + "description": description, + "status": "MATCHED" if matched else "PASS", + }, + ) + ) + + def emit_hook_summary( + self, + hook: str, + agent_name: str, + total_rules: int, + matched_rules: int, + final_action: str, + enforcement_mode: EnforcementMode, + ) -> None: + """Convenience method to emit a hook summary event.""" + self.emit( + AuditEvent( + event_type=EventType.HOOK_END, + agent_name=agent_name, + hook=hook, + data={ + "total_rules": total_rules, + "matched_rules": matched_rules, + "final_action": final_action, + "enforcement_mode": enforcement_mode, + }, + ) + ) + + def emit_session_start( + self, + session_id: str, + agent_name: str, + packs: list[str], + enforcement_mode: EnforcementMode, + ) -> None: + """Convenience method to emit a session start event. + + Same ``enforcement_mode: EnforcementMode`` contract as + :meth:`emit_rule_evaluation` and :meth:`emit_hook_summary` + — every governance event carries the emitter's per-instance + mode so sinks don't depend on a process-global. + """ + self.emit( + AuditEvent( + event_type=EventType.SESSION_START, + agent_name=agent_name, + data={ + "session_id": session_id, + "packs": packs, + "enforcement_mode": enforcement_mode, + }, + ) + ) + + def emit_session_end( + self, + session_id: str, + agent_name: str, + total_evaluations: int, + rules_matched: int, + rules_denied: int, + enforcement_mode: EnforcementMode, + ) -> None: + """Convenience method to emit a session end event.""" + self.emit( + AuditEvent( + event_type=EventType.SESSION_END, + agent_name=agent_name, + data={ + "session_id": session_id, + "total_evaluations": total_evaluations, + "rules_matched": rules_matched, + "rules_denied": rules_denied, + "enforcement_mode": enforcement_mode, + }, + ) + ) + + def flush(self) -> None: + """Flush every registered sink. + + Per-sink — a sink that maintains its own buffer (OTel batched + export, HTTP batcher, etc.) gets a chance to drain. The + manager itself holds no queue. + """ + with self._sinks_lock: + sinks = list(self._sinks) + for sink in sinks: + try: + sink.flush() + except Exception as e: + logger.warning("Audit sink '%s' failed to flush: %s", sink.name, e) + + def close(self) -> None: + """Close all sinks and release resources. + + Idempotent — a manager that has already been closed has an + empty sink list, so a repeat call is a no-op. + """ + with self._sinks_lock: + sinks = list(self._sinks) + self._sinks.clear() + self._sink_failures.clear() + self._tripped_sinks.clear() + for sink in sinks: + try: + sink.close() + except Exception as e: + logger.warning("Audit sink '%s' failed to close: %s", sink.name, e) diff --git a/src/uipath/runtime/governance/_audit/factory.py b/src/uipath/runtime/governance/_audit/factory.py new file mode 100644 index 0000000..334f867 --- /dev/null +++ b/src/uipath/runtime/governance/_audit/factory.py @@ -0,0 +1,33 @@ +"""Factory function for creating audit sinks by name. + +Used by :class:`AuditManager` to construct the always-on ``traces`` +sink at initialization. +""" + +from __future__ import annotations + +import logging + +from .base import AuditSink + +logger = logging.getLogger(__name__) + + +def create_sink(name: str) -> AuditSink | None: + """Create an audit sink by name. + + Args: + name: Name of the sink to create (currently only ``traces``). + + Returns: + The created sink, or ``None`` if the name is unknown. + """ + name = name.lower() + + if name == "traces": + from .traces import TracesAuditSink + + return TracesAuditSink() + + logger.warning("Unknown audit sink: %s", name) + return None diff --git a/src/uipath/runtime/governance/_audit/traces.py b/src/uipath/runtime/governance/_audit/traces.py new file mode 100644 index 0000000..e0092cf --- /dev/null +++ b/src/uipath/runtime/governance/_audit/traces.py @@ -0,0 +1,285 @@ +"""OpenTelemetry traces audit sink for governance events. + +Emits an OpenTelemetry span per rule evaluation and per hook summary. +This sink emits spans only — it does not resolve or stamp +job-execution metadata (organization, tenant, folder, job, trace id) +onto them. That resolution is owned by the platform-side OTel +exporter that ships spans downstream, so the runtime governance +contract stays scoped to span emission. +""" + +from __future__ import annotations + +import importlib.metadata +import logging +from typing import Any + +from uipath.core.governance import EnforcementMode + +from .base import AuditEvent, AuditSink, EventType + +logger = logging.getLogger(__name__) + + +def _package_version() -> str: + """Return the installed ``uipath-runtime`` version (``unknown`` if absent).""" + try: + return importlib.metadata.version("uipath-runtime") + except importlib.metadata.PackageNotFoundError: + return "unknown" + + +# Stamped on every governance span as ``uipath_governance.version`` so +# consumers can correlate the trace payload shape with the runtime +# release that produced it. Resolved once at import time — the installed +# package version doesn't change for the life of the process. +SCHEMA_VERSION = _package_version() + +# Value of the ``type`` / ``span_type`` span attributes on every +# governance span. Local to the runtime trace contract — kept as a +# string literal (not a cross-package import) so the runtime stays +# self-contained. +SPAN_TYPE_AGENT_RUN = "agentRun" + +# Set as the ``source`` attribute on every governance span. Lets +# consumers identify which producer emitted a given span when more +# than one governance producer feeds the same trace backend. +GOVERNANCE_SOURCE = "governance-checker-python" + +# Shared attribute namespace for every governance span attribute. +# Concatenated into each ``span.set_attribute`` call so the prefix +# appears in one place and a future rename is a one-line change. +NS = "uipath_governance" + +# Governance verdict / action vocabulary (UPPER_SNAKE). +EVALUATOR_ALLOW = "ALLOW" +EVALUATOR_DENY = "DENY" +EVALUATOR_HITL = "HITL" + +ACTION_ALLOW = "ALLOW" +ACTION_DENY = "DENY" +ACTION_HITL = "HITL" +ACTION_AUDIT = "AUDIT" +ACTION_NONE = "NONE" + +def _resolve_mode(event: AuditEvent) -> EnforcementMode: + """Read the enforcement mode the evaluator stamped on the event. + + Mode travels with the event (set by the emitter when it calls + :meth:`AuditManager.emit_rule_evaluation` / + :meth:`emit_hook_summary` and passes its own per-instance mode) so + the sink doesn't read a process-global that wouldn't be + authoritative in a parallel-runtime setup. + + Falls back to ``AUDIT`` only when the field is missing — that's a + contract violation by the emitter (every governance event must carry + the mode), but defaulting to the safe option avoids a sink crash. + """ + mode = event.data.get("enforcement_mode") + if isinstance(mode, EnforcementMode): + return mode + if isinstance(mode, str): + try: + return EnforcementMode(mode.lower()) + except ValueError: + pass + return EnforcementMode.AUDIT + + +def _derive_results( + matched: bool, configured_action: str, mode: EnforcementMode +) -> tuple[str, str]: + """Return ``(evaluator_result, action_applied)`` in spec vocabulary. + + ``evaluator_result`` is mode-independent — what the rule decided. The + rule's configured ``audit`` action collapses into a DENY decision + here; whether that DENY is actually applied is reflected in + ``action_applied``. + + ``action_applied`` is mode-driven. Currently only AUDIT mode is wired + in the runtime, so every non-allow result lands on ``AUDIT``; the + ENFORCE branch is kept so the contract is already correct when + ENFORCE arrives in a later phase. + + The configured ``audit`` rule-level action acts as a per-rule audit + override: even when global mode is ENFORCE, such a rule only ever + produces ``action_applied = AUDIT``. This preserves today's "audit + never blocks" behavior. + """ + action = configured_action.lower() + + if not matched or action == "allow": + return EVALUATOR_ALLOW, ACTION_NONE + + if action == "escalate": + evaluator = EVALUATOR_HITL + else: + evaluator = EVALUATOR_DENY + + # Per-rule audit override: emit AUDIT regardless of global mode. + if action == "audit": + return evaluator, ACTION_AUDIT + + if mode == EnforcementMode.ENFORCE: + return evaluator, ACTION_DENY if evaluator == EVALUATOR_DENY else ACTION_HITL + return evaluator, ACTION_AUDIT + +class TracesAuditSink(AuditSink): + """Audit sink that emits an OpenTelemetry span per governance event.""" + + def __init__(self) -> None: + """Initialize the sink with a deferred tracer and zero span count.""" + self._tracer: Any = None # Can be None, Tracer, or False + self._spans_created = 0 + + @property + def name(self) -> str: + """Constant sink identifier.""" + return "traces" + + def _get_tracer(self) -> Any: + """Get or create the OpenTelemetry tracer.""" + if self._tracer is None: + try: + from opentelemetry import trace + + self._tracer = trace.get_tracer("uipath.governance") + logger.info("OpenTelemetry tracer initialized for governance traces") + except ImportError: + logger.warning( + "OpenTelemetry not available — governance traces disabled." + ) + self._tracer = False + return self._tracer if self._tracer else None + + def emit(self, event: AuditEvent) -> None: + """Create a span for RULE_EVALUATION or HOOK_END events; drop others.""" + if event.event_type == EventType.RULE_EVALUATION: + self._emit_rule_span(event) + elif event.event_type == EventType.HOOK_END: + self._emit_hook_span(event) + + def _emit_hook_span(self, event: AuditEvent) -> None: + """Create a span for a hook summary (always emitted for each governance check).""" + tracer = self._get_tracer() + if tracer is None: + return + + try: + data = event.data + hook = event.hook or "unknown" + span_name = f"governance.{hook.lower()}" + + # Sink dispatch runs on the caller's thread (see + # :meth:`AuditManager.emit`), so the current OTel context + # is the agent's live span — the governance span attaches + # as its child without any cross-thread plumbing. + with tracer.start_as_current_span(span_name) as span: + span.set_attribute("type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("span_type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("uipath.custom_instrumentation", True) + span.set_attribute(f"{NS}.source", GOVERNANCE_SOURCE) + + # Mode travels on the event so parallel runtimes running + # different per-instance modes don't cross-contaminate. + mode = _resolve_mode(event) + final_action = data.get("final_action", "allow") + _, action_applied = _derive_results( + matched=final_action.lower() != "allow", + configured_action=final_action, + mode=mode, + ) + span.set_attribute(f"{NS}.hook", hook) + span.set_attribute(f"{NS}.action_applied", action_applied) + span.set_attribute(f"{NS}.mode", mode.value.upper()) + + # Hook spans are summary containers — severity lives on + # the per-rule spans. Marking the hook ERROR would paint + # the whole lifecycle phase as failed when only one rule + # fired beneath it. + + self._spans_created += 1 + + except Exception as e: + logger.warning("Failed to create governance hook span: %s", e) + + def _emit_rule_span(self, event: AuditEvent) -> None: + """Create a span for a rule evaluation.""" + tracer = self._get_tracer() + if tracer is None: + return + + try: + data = event.data + policy_id = data.get("policy_id", "unknown") + span_name = f"{NS}.rule.{policy_id}" + + # See _emit_hook_span: sync dispatch on the caller's + # thread means the current OTel context is the agent's + # live span, so this rule span attaches as its child. + with tracer.start_as_current_span(span_name) as span: + span.set_attribute("type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("span_type", SPAN_TYPE_AGENT_RUN) + span.set_attribute("uipath.custom_instrumentation", True) + span.set_attribute(f"{NS}.source", GOVERNANCE_SOURCE) + + # Single source of truth for the emitted attributes + # below AND the verbosityLevel / Status decision further + # down. Mode comes from the event (per-instance) so + # parallel runtimes don't cross-contaminate. + mode = _resolve_mode(event) + configured_action = data.get("action", "allow") + matched = bool(data.get("matched", False)) + evaluator_result, action_applied = _derive_results( + matched=matched, + configured_action=configured_action, + mode=mode, + ) + + span.set_attribute(f"{NS}.policy_id", policy_id) + span.set_attribute(f"{NS}.rule_name", data.get("rule_name", "")) + span.set_attribute(f"{NS}.pack_name", data.get("pack_name", "")) + span.set_attribute(f"{NS}.hook", event.hook) + span.set_attribute(f"{NS}.evaluator_result", evaluator_result) + span.set_attribute(f"{NS}.action_applied", action_applied) + span.set_attribute(f"{NS}.mode", mode.value.upper()) + span.set_attribute(f"{NS}.version", SCHEMA_VERSION) + + detail = data.get("detail", "") + if detail: + span.set_attribute(f"{NS}.evidence", detail[:500]) + + # Severity is driven off the derived ``action_applied``: + # - DENY — runtime blocked → verbosityLevel=4 + + # Status.ERROR (agent span genuinely failed). + # - AUDIT / HITL — advisory, runtime did not block → + # verbosityLevel=3, Status stays UNSET. Marking the + # agent span failed for an advisory rule would mislead. + # - ALLOW / NONE — no verbosityLevel attribute set. + if action_applied == ACTION_DENY: + span.set_attribute("verbosityLevel", 4) + try: + from opentelemetry.trace import Status, StatusCode + + span.set_status( + Status( + StatusCode.ERROR, + f"Policy violation: " + f"{data.get('rule_name', policy_id)} " + f"(action={configured_action.lower()})", + ) + ) + except ImportError: + pass + elif action_applied in (ACTION_AUDIT, ACTION_HITL): + span.set_attribute("verbosityLevel", 3) + + self._spans_created += 1 + + except Exception as e: + logger.warning("Failed to create governance span: %s", e) + + @property + def spans_created(self) -> int: + """Number of spans created.""" + return self._spans_created diff --git a/src/uipath/runtime/governance/native/models.py b/src/uipath/runtime/governance/native/models.py index 125e75e..eb874a7 100644 --- a/src/uipath/runtime/governance/native/models.py +++ b/src/uipath/runtime/governance/native/models.py @@ -74,12 +74,16 @@ class Rule: @dataclass class CheckContext: - """Context passed to rule evaluation.""" + """Context passed to rule evaluation. + + Scoped to evaluator input data only. Trace correlation is + intentionally not carried here — that concern is owned by the + provider / platform layer, not by the evaluator input model. + """ hook: LifecycleHook agent_name: str runtime_id: str - trace_id: str # Content fields (populated based on hook) agent_input: str = "" diff --git a/tests/test_audit_manager_lifecycle.py b/tests/test_audit_manager_lifecycle.py new file mode 100644 index 0000000..25094f9 --- /dev/null +++ b/tests/test_audit_manager_lifecycle.py @@ -0,0 +1,207 @@ +"""Lifecycle tests for :class:`AuditManager`. + +Pins the production-readiness invariants of the audit manager: + +- Construction is side-effect-free: no background thread, no atexit + registration, no global state mutation. +- :meth:`close` is idempotent. +- :meth:`emit` dispatches on the caller's thread, so an OTel-backed + sink sees the caller's live span without any cross-thread plumbing. +""" + +from __future__ import annotations + +import threading +from typing import Any + +from uipath.runtime.governance._audit.base import ( + AuditEvent, + AuditManager, + AuditSink, + EventType, +) + +# --------------------------------------------------------------------------- +# Construction is side-effect-free +# --------------------------------------------------------------------------- + + +def test_construction_starts_no_background_thread() -> None: + """``AuditManager()`` must not spawn a worker thread. + + Regression guard for the design pivot: the audit pipeline used to + construct a daemon worker thread eagerly. Construction now only + builds in-memory state; any async export lives inside the sink. + """ + before = {t.name for t in threading.enumerate()} + m = AuditManager(register_default_sinks=False) + after = {t.name for t in threading.enumerate()} + try: + assert after == before, ( + f"AuditManager() spawned a thread; new threads: {after - before}" + ) + finally: + m.close() + + +def test_default_sink_registered_on_construction() -> None: + """With defaults, the traces sink is auto-registered.""" + m = AuditManager() + try: + assert "traces" in m.list_sinks() + finally: + m.close() + + +def test_bare_construction_skips_default_sink() -> None: + """``register_default_sinks=False`` produces an empty manager.""" + m = AuditManager(register_default_sinks=False) + try: + assert m.list_sinks() == [] + finally: + m.close() + + +# --------------------------------------------------------------------------- +# close() is idempotent and clears sinks +# --------------------------------------------------------------------------- + + +def test_close_clears_sinks_and_failure_state() -> None: + """``close()`` empties sinks, failure counters, and tripped set.""" + + class _Sink(AuditSink): + def __init__(self, name: str) -> None: + self._name = name + self.closed = False + + @property + def name(self) -> str: + return self._name + + def emit(self, event: AuditEvent) -> None: + pass + + def close(self) -> None: + self.closed = True + + m = AuditManager(register_default_sinks=False) + s = _Sink("test") + m.register_sink(s) + m._sink_failures["test"] = 3 + m._tripped_sinks.add("test") + + m.close() + + assert m.list_sinks() == [] + assert m._sink_failures == {} + assert m._tripped_sinks == set() + assert s.closed + + +def test_close_is_idempotent() -> None: + """Calling ``close()`` twice must not raise.""" + m = AuditManager(register_default_sinks=False) + m.close() + m.close() # must not raise + + +# --------------------------------------------------------------------------- +# flush() delegates to every sink +# --------------------------------------------------------------------------- + + +def test_flush_calls_flush_on_each_sink() -> None: + """The manager holds no buffer; ``flush()`` is a fan-out to sinks.""" + + class _Sink(AuditSink): + def __init__(self, name: str) -> None: + self._name = name + self.flush_count = 0 + + @property + def name(self) -> str: + return self._name + + def emit(self, event: AuditEvent) -> None: + pass + + def flush(self) -> None: + self.flush_count += 1 + + m = AuditManager(register_default_sinks=False) + a, b = _Sink("a"), _Sink("b") + m.register_sink(a) + m.register_sink(b) + try: + m.flush() + assert a.flush_count == 1 + assert b.flush_count == 1 + finally: + m.close() + + +# --------------------------------------------------------------------------- +# emit() runs on the caller's thread — OTel context visible directly +# --------------------------------------------------------------------------- + + +def test_emit_runs_on_caller_thread() -> None: + """``emit()`` invokes sinks synchronously on the calling thread. + + Asserts the design contract that lets OTel-backed sinks see the + agent's live span via ``trace.get_current_span()`` without any + cross-thread context propagation. + """ + captured: dict[str, Any] = {} + + class _Probe(AuditSink): + @property + def name(self) -> str: + return "probe" + + def emit(self, event: AuditEvent) -> None: + captured["thread"] = threading.current_thread() + + m = AuditManager(register_default_sinks=False) + m.register_sink(_Probe()) + try: + m.emit(AuditEvent(event_type=EventType.RULE_EVALUATION)) + assert captured["thread"] is threading.current_thread() + finally: + m.close() + + +def test_emit_propagates_otel_span_via_current_context() -> None: + """An OTel-backed sink sees the caller's live span directly. + + With sync dispatch there's no contextvars snapshot/restore — the + sink just calls ``trace.get_current_span()`` on the same thread the + caller is on, and that's the span the caller has active. + """ + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + + captured: dict[str, Any] = {} + + class _Probe(AuditSink): + @property + def name(self) -> str: + return "probe" + + def emit(self, event: AuditEvent) -> None: + sc = trace.get_current_span().get_span_context() + captured["trace_id"] = sc.trace_id if sc.is_valid else None + captured["span_id"] = sc.span_id if sc.is_valid else None + + tracer = TracerProvider().get_tracer("test") + m = AuditManager(register_default_sinks=False) + m.register_sink(_Probe()) + try: + with tracer.start_as_current_span("agent-run") as span: + expected = span.get_span_context() + m.emit(AuditEvent(event_type=EventType.RULE_EVALUATION)) + assert captured["trace_id"] == expected.trace_id + assert captured["span_id"] == expected.span_id + finally: + m.close() diff --git a/tests/test_audit_register_sink.py b/tests/test_audit_register_sink.py new file mode 100644 index 0000000..d0e3590 --- /dev/null +++ b/tests/test_audit_register_sink.py @@ -0,0 +1,108 @@ +"""Tests for ``AuditManager.register_sink`` failure-counter semantics. + +A re-registered same-name sink must NOT inherit the previous instance's +tripped circuit-breaker state. ``unregister_sink`` already clears these +counters, but ``register_sink`` also clears them on a successful add as +defense-in-depth (covers tests / external callers that touch the +internal counter dicts directly). +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from uipath.runtime.governance._audit.base import ( + AuditEvent, + AuditManager, + AuditSink, + EventType, +) + + +class _NoopSink(AuditSink): + """Sink that records emit calls and never raises.""" + + def __init__(self, name: str = "test-sink") -> None: + self._name = name + self.events: list[AuditEvent] = [] + + @property + def name(self) -> str: + return self._name + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + +def _event() -> AuditEvent: + return AuditEvent(event_type=EventType.RULE_EVALUATION, agent_name="a") + + +@pytest.fixture +def manager() -> Any: + """Build a fresh AuditManager with no default sinks. + + ``register_default_sinks=False`` keeps the traces sink out of the + test, so assertions about registered sinks see only what the test + puts there. + """ + return AuditManager(register_default_sinks=False) + + +def test_register_clears_stale_failure_counter(manager: AuditManager) -> None: + """A new sink with a name that previously tripped starts fresh.""" + # Simulate prior instance having tripped the circuit-breaker without + # going through unregister (e.g. test code or external code that + # mutated the counters directly). + manager._sink_failures["test-sink"] = manager._SINK_FAILURE_THRESHOLD + manager._tripped_sinks.add("test-sink") + + new_sink = _NoopSink(name="test-sink") + manager.register_sink(new_sink) + + # Counter and tripped-set must be cleared. + assert manager._sink_failures.get("test-sink", 0) == 0 + assert "test-sink" not in manager._tripped_sinks + + # And the new sink actually receives events (would be skipped if + # still considered tripped). + manager.emit(_event()) + assert len(new_sink.events) == 1 + + +def test_register_does_not_clear_for_duplicate(manager: AuditManager) -> None: + """Re-registering an already-present sink is a no-op (no counter reset).""" + sink = _NoopSink(name="test-sink") + manager.register_sink(sink) + + # Simulate the existing sink having accumulated some failures. + manager._sink_failures["test-sink"] = 3 + + # A second register call with the same name should NOT clear those + # failures — the duplicate-check fires before the reset. + duplicate = _NoopSink(name="test-sink") + manager.register_sink(duplicate) + + assert manager._sink_failures["test-sink"] == 3 + + +def test_unregister_then_register_starts_fresh(manager: AuditManager) -> None: + """The full lifecycle: register → trip → unregister → register again.""" + sink = _NoopSink(name="test-sink") + manager.register_sink(sink) + manager._sink_failures["test-sink"] = manager._SINK_FAILURE_THRESHOLD + manager._tripped_sinks.add("test-sink") + + manager.unregister_sink("test-sink") + # Unregister already clears. + assert "test-sink" not in manager._tripped_sinks + + new_sink = _NoopSink(name="test-sink") + manager.register_sink(new_sink) + assert manager._sink_failures.get("test-sink", 0) == 0 + assert "test-sink" not in manager._tripped_sinks + + manager.emit(_event()) + assert len(new_sink.events) == 1 diff --git a/tests/test_traces_severity.py b/tests/test_traces_severity.py new file mode 100644 index 0000000..30fd356 --- /dev/null +++ b/tests/test_traces_severity.py @@ -0,0 +1,271 @@ +"""Tests for trace-span verbosity / status semantics. + +``TracesAuditSink`` emits an OpenTelemetry span for every governance +hook end and every rule evaluation. The contract follows §4 of the +cross-product unification doc — verdict is split into ``evaluator_result`` +(what the rule decided, mode-independent) and ``action_applied`` (what +actually happened, derived from evaluator_result + mode). + +Mode travels with the event (set by the emitter from its +per-instance ``EnforcementMode``) so parallel runtimes running +different modes don't cross-contaminate the sink's view. + +- ``verbosityLevel = 4`` (Error) and ``StatusCode.ERROR`` fire **only** + when ``action_applied = DENY`` — i.e. the runtime actually blocked + the agent (ENFORCE mode + configured action ``deny``). +- ``verbosityLevel = 3`` (Warning) and ``Status.UNSET`` for advisory + outcomes (``action_applied`` in ``{AUDIT, HITL}``). HITL is its own + spec bucket — escalation pauses for human review, it doesn't fail + the run, so it stays Warning even in ENFORCE mode. +- Hook spans never set Status, regardless of mode or final_action. + They're summary containers; severity belongs on the per-rule span. +- ``ALLOW`` / ``NONE`` results leave verbosityLevel unset (Orchestrator + default = 2, Information) and never call set_status. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest +from uipath.core.governance import EnforcementMode + +from uipath.runtime.governance._audit.base import AuditEvent, EventType +from uipath.runtime.governance._audit.traces import TracesAuditSink + + +@pytest.fixture +def captured_span(monkeypatch: pytest.MonkeyPatch) -> MagicMock: + """Wire ``TracesAuditSink`` to a mock tracer and return the span mock.""" + span = MagicMock(name="span") + tracer = MagicMock(name="tracer") + tracer.start_as_current_span.return_value.__enter__.return_value = span + tracer.start_as_current_span.return_value.__exit__.return_value = False + monkeypatch.setattr(TracesAuditSink, "_get_tracer", lambda self: tracer) + return span + + +def _hook_event(final_action: str, mode: EnforcementMode) -> AuditEvent: + return AuditEvent( + event_type=EventType.HOOK_END, + agent_name="agent", + hook="after_model", + data={ + "total_rules": 1, + "matched_rules": 1 if final_action != "allow" else 0, + "final_action": final_action, + "enforcement_mode": mode, + }, + ) + + +def _rule_event( + matched: bool, action: str, mode: EnforcementMode = EnforcementMode.AUDIT +) -> AuditEvent: + return AuditEvent( + event_type=EventType.RULE_EVALUATION, + agent_name="agent", + hook="after_model", + data={ + "policy_id": "A.10.4", + "rule_name": "commitment-language", + "pack_name": "iso42001", + "matched": matched, + "action": action, + "enforcement_mode": mode, + "status": "MATCHED" if matched else "PASS", + "detail": "Customer-binding commitment detected.", + }, + ) + + +def _span_attrs(span: MagicMock) -> dict[str, object]: + """Return a mapping of attribute name → value for set_attribute calls.""" + attrs: dict[str, object] = {} + for call in span.set_attribute.call_args_list: + key, value = call.args + attrs[key] = value + return attrs + + +# --------------------------------------------------------------------------- +# Hook span — never marked ERROR +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "final_action,mode", + [ + ("deny", EnforcementMode.ENFORCE), + ("deny", EnforcementMode.AUDIT), + ("audit", EnforcementMode.AUDIT), + ("escalate", EnforcementMode.AUDIT), + ("allow", EnforcementMode.AUDIT), + ], +) +def test_hook_span_never_sets_error( + captured_span: MagicMock, final_action: str, mode: EnforcementMode +) -> None: + """Hook spans are summary containers — they never carry an ERROR Status.""" + sink = TracesAuditSink() + sink.emit(_hook_event(final_action=final_action, mode=mode)) + assert not captured_span.set_status.called, ( + f"Hook span should never set_status; called with " + f"final_action={final_action!r}, mode={mode!r}" + ) + + +# --------------------------------------------------------------------------- +# Rule span — enforce-mode DENY is the only Status.ERROR case +# --------------------------------------------------------------------------- + + +def test_enforce_mode_deny_is_error(captured_span: MagicMock) -> None: + """Enforce mode + action=deny = real block → verbosityLevel=4 + Status.ERROR.""" + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action="deny", mode=EnforcementMode.ENFORCE)) + + attrs = _span_attrs(captured_span) + assert attrs.get("verbosityLevel") == 4 + assert attrs.get("uipath_governance.evaluator_result") == "DENY" + assert attrs.get("uipath_governance.action_applied") == "DENY" + assert attrs.get("uipath_governance.mode") == "ENFORCE" + + assert captured_span.set_status.called, ( + "Status.ERROR must fire for enforce-mode deny violation" + ) + (status_arg,) = captured_span.set_status.call_args.args + from opentelemetry.trace import Status, StatusCode + + assert isinstance(status_arg, Status) + assert status_arg.status_code is StatusCode.ERROR + assert "commitment-language" in status_arg.description + assert "deny" in status_arg.description + + +def test_enforce_mode_escalate_is_hitl_warning(captured_span: MagicMock) -> None: + """Enforce mode + action=escalate = HITL pause, not a block. + + HITL is its own spec bucket distinct from DENY — escalation pauses + for human review, the run isn't failed. So verbosityLevel stays at + Warning and Status is not marked ERROR. + """ + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action="escalate", mode=EnforcementMode.ENFORCE)) + + attrs = _span_attrs(captured_span) + assert attrs.get("verbosityLevel") == 3 + assert attrs.get("uipath_governance.evaluator_result") == "HITL" + assert attrs.get("uipath_governance.action_applied") == "HITL" + assert attrs.get("uipath_governance.mode") == "ENFORCE" + assert not captured_span.set_status.called + + +# --------------------------------------------------------------------------- +# Rule span — advisory violations (audit mode, or audit-action rules) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "action,expected_evaluator", + [("deny", "DENY"), ("audit", "DENY"), ("escalate", "HITL")], +) +def test_audit_mode_violation_is_warning( + captured_span: MagicMock, action: str, expected_evaluator: str +) -> None: + """Audit mode never blocks → action_applied=AUDIT, verbosityLevel=3. + + Surfacing Status.ERROR for an audit-mode violation would falsely + mark the agent's run as failed when the runtime intentionally + let it through. evaluator_result still records the rule's actual + decision (DENY/HITL), independent of mode. + """ + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action=action, mode=EnforcementMode.AUDIT)) + + attrs = _span_attrs(captured_span) + assert attrs.get("verbosityLevel") == 3 + assert attrs.get("uipath_governance.evaluator_result") == expected_evaluator + assert attrs.get("uipath_governance.action_applied") == "AUDIT" + assert attrs.get("uipath_governance.mode") == "AUDIT" + + assert not captured_span.set_status.called, ( + f"Audit-mode {action} violation must NOT set Status.ERROR" + ) + + +def test_enforce_mode_audit_action_is_warning(captured_span: MagicMock) -> None: + """Enforce mode + action=audit is a per-rule audit override. + + The rule's configured ``audit`` action means "log this match but + don't block" even when the global mode is ENFORCE. evaluator_result + is DENY (the rule decided to deny), but action_applied is AUDIT + (the per-rule override kicks in), so verbosity stays Warning. + """ + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action="audit", mode=EnforcementMode.ENFORCE)) + + attrs = _span_attrs(captured_span) + assert attrs.get("verbosityLevel") == 3 + assert attrs.get("uipath_governance.evaluator_result") == "DENY" + assert attrs.get("uipath_governance.action_applied") == "AUDIT" + assert attrs.get("uipath_governance.mode") == "ENFORCE" + assert not captured_span.set_status.called + + +# --------------------------------------------------------------------------- +# Rule span — no violation, no verbosityLevel attribute (Orchestrator default = 2) +# --------------------------------------------------------------------------- + + +def test_unmatched_rule_no_verbosity_no_error(captured_span: MagicMock) -> None: + """Unmatched evaluations → evaluator_result=ALLOW, action_applied=NONE, quiet.""" + sink = TracesAuditSink() + sink.emit(_rule_event(matched=False, action="deny", mode=EnforcementMode.ENFORCE)) + + attrs = _span_attrs(captured_span) + assert "verbosityLevel" not in attrs + assert attrs.get("uipath_governance.evaluator_result") == "ALLOW" + assert attrs.get("uipath_governance.action_applied") == "NONE" + assert not captured_span.set_status.called + + +def test_matched_allow_action_no_verbosity(captured_span: MagicMock) -> None: + """A rule whose action is 'allow' is an explicit non-violation.""" + sink = TracesAuditSink() + sink.emit(_rule_event(matched=True, action="allow", mode=EnforcementMode.ENFORCE)) + + attrs = _span_attrs(captured_span) + assert "verbosityLevel" not in attrs + assert attrs.get("uipath_governance.evaluator_result") == "ALLOW" + assert attrs.get("uipath_governance.action_applied") == "NONE" + assert not captured_span.set_status.called + + +# --------------------------------------------------------------------------- +# Cross-runtime isolation — the architectural motivation for the refactor +# --------------------------------------------------------------------------- + + +def test_two_events_carry_independent_modes(captured_span: MagicMock) -> None: + """Parallel runtimes (different modes) cannot cross-contaminate the sink. + + Mode travels on each event (set by the emitter from its own + per-instance ``EnforcementMode``), so two consecutive emits with + different modes each render their own correct + ``uipath_governance.mode`` value — no shared state in the sink + that one runtime could clobber for another. + """ + sink = TracesAuditSink() + + sink.emit(_rule_event(matched=True, action="deny", mode=EnforcementMode.ENFORCE)) + sink.emit(_rule_event(matched=True, action="deny", mode=EnforcementMode.AUDIT)) + + # Collect every set_attribute call ordered by emit. + calls = [c.args for c in captured_span.set_attribute.call_args_list] + modes = [v for k, v in calls if k == "uipath_governance.mode"] + actions_applied = [v for k, v in calls if k == "uipath_governance.action_applied"] + assert modes == ["ENFORCE", "AUDIT"] + assert actions_applied == ["DENY", "AUDIT"] + + diff --git a/uv.lock b/uv.lock index e3a9a78..4f29f88 100644 --- a/uv.lock +++ b/uv.lock @@ -1191,7 +1191,7 @@ dev = [ requires-dist = [ { name = "chardet", specifier = ">=5.2.0,<8.0" }, { name = "pyyaml", specifier = ">=6.0,<7.0" }, - { name = "uipath-core", specifier = ">=0.5.21,<0.6.0" }, + { name = "uipath-core", specifier = ">=0.5.22,<0.6.0" }, { name = "vadersentiment", specifier = ">=3.3.2,<4.0" }, ]