diff --git a/.github/scripts/modal-sync-secrets.sh b/.github/scripts/modal-sync-secrets.sh index b56384d88..668bb0bb0 100755 --- a/.github/scripts/modal-sync-secrets.sh +++ b/.github/scripts/modal-sync-secrets.sh @@ -1,28 +1,50 @@ #!/bin/bash # Sync secrets from GitHub to Modal environment # Usage: ./modal-sync-secrets.sh -# Required env vars: LOGFIRE_TOKEN, GCP_CREDENTIALS_JSON (optional) +# Required env vars: +# LOGFIRE_TOKEN +# GCP_CREDENTIALS_JSON (optional) +# OBSERVABILITY_ENABLED (optional, defaults to false) +# OBSERVABILITY_SHADOW_MODE (optional, defaults to true) +# OBSERVABILITY_OTLP_ENDPOINT (required when OBSERVABILITY_ENABLED=true) +# OBSERVABILITY_OTLP_HEADERS (required when OBSERVABILITY_ENABLED=true) set -euo pipefail MODAL_ENV="${1:?Modal environment required}" GH_ENV="${2:?GitHub environment required}" +OBSERVABILITY_ENABLED="${OBSERVABILITY_ENABLED:-false}" +OBSERVABILITY_SHADOW_MODE="${OBSERVABILITY_SHADOW_MODE:-true}" echo "Syncing secrets to Modal environment: $MODAL_ENV" +if [ "$OBSERVABILITY_ENABLED" = "true" ]; then + : "${OBSERVABILITY_OTLP_ENDPOINT:?OBSERVABILITY_OTLP_ENDPOINT is required when observability is enabled}" + : "${OBSERVABILITY_OTLP_HEADERS:?OBSERVABILITY_OTLP_HEADERS is required when observability is enabled}" +fi + # Sync Logfire secret uv run modal secret create policyengine-logfire \ "LOGFIRE_TOKEN=${LOGFIRE_TOKEN:-}" \ "LOGFIRE_ENVIRONMENT=$GH_ENV" \ --env="$MODAL_ENV" \ - --force || true + --force # Sync GCP credentials if provided if [ -n "${GCP_CREDENTIALS_JSON:-}" ]; then uv run modal secret create gcp-credentials \ "GOOGLE_APPLICATION_CREDENTIALS_JSON=$GCP_CREDENTIALS_JSON" \ --env="$MODAL_ENV" \ - --force || true + --force fi +uv run modal secret create policyengine-observability \ + "OBSERVABILITY_ENABLED=$OBSERVABILITY_ENABLED" \ + "OBSERVABILITY_SHADOW_MODE=$OBSERVABILITY_SHADOW_MODE" \ + "OBSERVABILITY_ENVIRONMENT=$GH_ENV" \ + "OBSERVABILITY_OTLP_ENDPOINT=${OBSERVABILITY_OTLP_ENDPOINT:-}" \ + "OBSERVABILITY_OTLP_HEADERS=${OBSERVABILITY_OTLP_HEADERS:-}" \ + --env="$MODAL_ENV" \ + --force + echo "Modal secrets synced" diff --git a/.github/workflows/modal-deploy.reusable.yml b/.github/workflows/modal-deploy.reusable.yml index 07ce2d777..b79d193b0 100644 --- a/.github/workflows/modal-deploy.reusable.yml +++ b/.github/workflows/modal-deploy.reusable.yml @@ -55,6 +55,10 @@ jobs: MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }} LOGFIRE_TOKEN: ${{ secrets.LOGFIRE_TOKEN }} GCP_CREDENTIALS_JSON: ${{ secrets.GCP_CREDENTIALS_JSON }} + OBSERVABILITY_ENABLED: ${{ vars.OBSERVABILITY_ENABLED }} + OBSERVABILITY_SHADOW_MODE: ${{ vars.OBSERVABILITY_SHADOW_MODE }} + OBSERVABILITY_OTLP_ENDPOINT: ${{ vars.OBSERVABILITY_OTLP_ENDPOINT }} + OBSERVABILITY_OTLP_HEADERS: ${{ secrets.OBSERVABILITY_OTLP_HEADERS }} run: ../../.github/scripts/modal-sync-secrets.sh "${{ inputs.modal_environment }}" "${{ inputs.environment }}" - name: Deploy simulation API to Modal diff --git a/changelog_entry.yaml b/changelog_entry.yaml index 586592697..87f52ce15 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -1,4 +1,4 @@ - bump: patch changes: changed: - - Bumped policyengine-core minimum version to 3.23.5 for pandas 3.0 compatibility + - Added baseline Grafana-compatible OTLP observability and stage timing telemetry for the simulation gateway and worker. diff --git a/libs/policyengine-fastapi/pyproject.toml b/libs/policyengine-fastapi/pyproject.toml index 7b875ccad..aa6e9a6a7 100644 --- a/libs/policyengine-fastapi/pyproject.toml +++ b/libs/policyengine-fastapi/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "fastapi[standard] >=0.115.8,<0.116.0", "pyjwt >=2.10.1,<3.0.0", "opentelemetry-sdk >=1.30.0,<2.0.0", + "opentelemetry-exporter-otlp-proto-http >=1.30.0,<2.0.0", "sqlmodel >=0.0.22,<0.0.23", "python-json-logger >=3.2.1,<4.0.0", "opentelemetry-instrumentation-logging >=0.51b0,<0.52", diff --git a/libs/policyengine-fastapi/src/policyengine_fastapi/__init__.py b/libs/policyengine-fastapi/src/policyengine_fastapi/__init__.py index 7cc751967..2d92aad5f 100644 --- a/libs/policyengine-fastapi/src/policyengine_fastapi/__init__.py +++ b/libs/policyengine-fastapi/src/policyengine_fastapi/__init__.py @@ -8,4 +8,5 @@ TracerCaptureMode as TracerCaptureMode, build_observability as build_observability, get_observability as get_observability, + reset_observability_cache as reset_observability_cache, ) diff --git a/libs/policyengine-fastapi/src/policyengine_fastapi/observability/__init__.py b/libs/policyengine-fastapi/src/policyengine_fastapi/observability/__init__.py index 24ba71421..9dfd127bc 100644 --- a/libs/policyengine-fastapi/src/policyengine_fastapi/observability/__init__.py +++ b/libs/policyengine-fastapi/src/policyengine_fastapi/observability/__init__.py @@ -9,6 +9,7 @@ from .config import ( ObservabilityConfig as ObservabilityConfig, + parse_bool as parse_bool, parse_header_value_pairs as parse_header_value_pairs, ) from .contracts import ( @@ -26,12 +27,15 @@ ) from .emitters import ( Observability as Observability, + build_traceparent as build_traceparent, NoOpObservability as NoOpObservability, NoOpSpan as NoOpSpan, + OtlpObservability as OtlpObservability, ) from .provider import ( build_observability as build_observability, get_observability as get_observability, + reset_observability_cache as reset_observability_cache, ) from .stages import ( SimulationStage as SimulationStage, diff --git a/libs/policyengine-fastapi/src/policyengine_fastapi/observability/config.py b/libs/policyengine-fastapi/src/policyengine_fastapi/observability/config.py index fbe58ebd3..56df26279 100644 --- a/libs/policyengine-fastapi/src/policyengine_fastapi/observability/config.py +++ b/libs/policyengine-fastapi/src/policyengine_fastapi/observability/config.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field +import os from .stages import TracerCaptureMode @@ -30,6 +31,14 @@ def parse_header_value_pairs(raw: str | None) -> dict[str, str]: return headers +def parse_bool(raw: str | bool | None, default: bool = False) -> bool: + if raw is None: + return default + if isinstance(raw, bool): + return raw + return raw.strip().lower() in {"1", "true", "yes", "on"} + + @dataclass(frozen=True) class ObservabilityConfig: enabled: bool = False @@ -46,3 +55,36 @@ class ObservabilityConfig: @classmethod def disabled(cls, service_name: str = "policyengine-observability"): return cls(enabled=False, service_name=service_name) + + @classmethod + def from_env( + cls, + service_name: str, + environment: str = "production", + prefix: str = "OBSERVABILITY_", + ) -> "ObservabilityConfig": + return cls( + enabled=parse_bool(os.getenv(f"{prefix}ENABLED"), default=False), + shadow_mode=parse_bool( + os.getenv(f"{prefix}SHADOW_MODE"), + default=True, + ), + service_name=os.getenv(f"{prefix}SERVICE_NAME", service_name), + environment=os.getenv(f"{prefix}ENVIRONMENT", environment), + otlp_endpoint=os.getenv(f"{prefix}OTLP_ENDPOINT"), + otlp_headers=parse_header_value_pairs(os.getenv(f"{prefix}OTLP_HEADERS")), + artifact_bucket=os.getenv(f"{prefix}ARTIFACT_BUCKET"), + artifact_prefix=os.getenv( + f"{prefix}ARTIFACT_PREFIX", + "simulation-observability", + ), + tracer_capture_mode=TracerCaptureMode( + os.getenv( + f"{prefix}TRACER_CAPTURE_MODE", + TracerCaptureMode.DISABLED.value, + ) + ), + slow_run_threshold_seconds=float( + os.getenv(f"{prefix}SLOW_RUN_THRESHOLD_SECONDS", "30.0") + ), + ) diff --git a/libs/policyengine-fastapi/src/policyengine_fastapi/observability/emitters.py b/libs/policyengine-fastapi/src/policyengine_fastapi/observability/emitters.py index 97afa0eee..04c7282fa 100644 --- a/libs/policyengine-fastapi/src/policyengine_fastapi/observability/emitters.py +++ b/libs/policyengine-fastapi/src/policyengine_fastapi/observability/emitters.py @@ -2,8 +2,15 @@ from contextlib import AbstractContextManager from dataclasses import dataclass, field +from datetime import date, datetime +from enum import Enum +import json +import logging from typing import Protocol from typing import Any, Mapping +from opentelemetry.trace.propagation.tracecontext import ( + TraceContextTextMapPropagator, +) from .config import ObservabilityConfig from .contracts import SimulationLifecycleEvent, TracerArtifactManifest @@ -22,11 +29,87 @@ def set_attribute(self, key: str, value: Any) -> None: def add_event(self, name: str, attributes: Mapping[str, Any] | None = None) -> None: return None + def get_traceparent(self) -> str | None: + return None + + +class OTelSpan(AbstractContextManager["OTelSpan"]): + def __init__(self, context_manager: AbstractContextManager): + self._context_manager = context_manager + self._span = None + + def __enter__(self): + self._span = self._context_manager.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + return self._context_manager.__exit__(exc_type, exc_value, traceback) + + def set_attribute(self, key: str, value: Any) -> None: + if self._span is not None: + self._span.set_attribute(key, _normalize_attribute_value(value)) + + def add_event(self, name: str, attributes: Mapping[str, Any] | None = None) -> None: + if self._span is not None: + self._span.add_event(name, _normalize_attributes(attributes)) + + def get_traceparent(self) -> str | None: + if self._span is None: + return None + return build_traceparent(self._span.get_span_context()) + + +class JsonPayloadFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + message = record.msg + if isinstance(message, str): + return message + if isinstance(message, Mapping): + payload = dict(message) + else: + payload = {"message": record.getMessage()} + payload.setdefault("severity", record.levelname) + payload.setdefault("logger", record.name) + return json.dumps(payload, sort_keys=True, default=_json_default) + + +def _json_default(value: Any) -> Any: + if isinstance(value, (datetime, date)): + return value.isoformat() + if isinstance(value, Enum): + return value.value + return str(value) + + +def _normalize_attribute_value(value: Any) -> Any: + if value is None: + return None + if isinstance(value, (bool, int, float, str)): + return value + return json.dumps(value, sort_keys=True, default=_json_default) + + +def _normalize_attributes( + attributes: Mapping[str, Any] | None, +) -> dict[str, Any]: + normalized: dict[str, Any] = {} + if attributes is None: + return normalized + + for key, value in attributes.items(): + normalized_value = _normalize_attribute_value(value) + if normalized_value is not None: + normalized[key] = normalized_value + return normalized + class Observability(Protocol): config: ObservabilityConfig - def emit_lifecycle_event(self, event: SimulationLifecycleEvent) -> None: ... + def emit_lifecycle_event( + self, + event: SimulationLifecycleEvent | Mapping[str, Any], + ) -> None: ... def emit_counter( self, @@ -45,8 +128,11 @@ def emit_histogram( def record_artifact_manifest(self, manifest: TracerArtifactManifest) -> None: ... def span( - self, name: str, attributes: Mapping[str, Any] | None = None - ) -> NoOpSpan: ... + self, + name: str, + attributes: Mapping[str, Any] | None = None, + parent_traceparent: str | None = None, + ) -> AbstractContextManager: ... def flush(self) -> None: ... @@ -55,7 +141,10 @@ def flush(self) -> None: ... class NoOpObservability: config: ObservabilityConfig = field(default_factory=ObservabilityConfig.disabled) - def emit_lifecycle_event(self, event: SimulationLifecycleEvent) -> None: + def emit_lifecycle_event( + self, + event: SimulationLifecycleEvent | Mapping[str, Any], + ) -> None: return None def emit_counter( @@ -77,8 +166,105 @@ def emit_histogram( def record_artifact_manifest(self, manifest: TracerArtifactManifest) -> None: return None - def span(self, name: str, attributes: Mapping[str, Any] | None = None) -> NoOpSpan: + def span( + self, + name: str, + attributes: Mapping[str, Any] | None = None, + parent_traceparent: str | None = None, + ) -> NoOpSpan: return NoOpSpan() def flush(self) -> None: return None + + +@dataclass +class OtlpObservability: + config: ObservabilityConfig + tracer: Any + meter: Any + lifecycle_logger: logging.Logger + tracer_provider: Any + meter_provider: Any + logger_provider: Any = None + counter_cache: dict[str, Any] = field(default_factory=dict) + histogram_cache: dict[str, Any] = field(default_factory=dict) + + def emit_lifecycle_event( + self, + event: SimulationLifecycleEvent | Mapping[str, Any], + ) -> None: + if hasattr(event, "model_dump"): + payload = event.model_dump(mode="json") + else: + payload = dict(event) + self.lifecycle_logger.info(payload) + + def emit_counter( + self, + name: str, + value: int = 1, + attributes: Mapping[str, str] | None = None, + ) -> None: + counter = self.counter_cache.get(name) + if counter is None: + counter = self.meter.create_counter(name) + self.counter_cache[name] = counter + counter.add(value, attributes=_normalize_attributes(attributes)) + + def emit_histogram( + self, + name: str, + value: float, + attributes: Mapping[str, str] | None = None, + ) -> None: + histogram = self.histogram_cache.get(name) + if histogram is None: + histogram = self.meter.create_histogram(name) + self.histogram_cache[name] = histogram + histogram.record(value, attributes=_normalize_attributes(attributes)) + + def record_artifact_manifest(self, manifest: TracerArtifactManifest) -> None: + self.lifecycle_logger.info( + { + "event_name": "simulation.tracer.artifact_manifest", + "manifest": manifest.model_dump(mode="json"), + } + ) + + def span( + self, + name: str, + attributes: Mapping[str, Any] | None = None, + parent_traceparent: str | None = None, + ) -> OTelSpan: + context = None + if parent_traceparent: + context = TraceContextTextMapPropagator().extract( + {"traceparent": parent_traceparent} + ) + return OTelSpan( + self.tracer.start_as_current_span( + name, + attributes=_normalize_attributes(attributes), + context=context, + ) + ) + + def flush(self) -> None: + if self.tracer_provider is not None: + self.tracer_provider.force_flush() + if self.meter_provider is not None: + self.meter_provider.force_flush() + if self.logger_provider is not None: + self.logger_provider.force_flush() + + +def build_traceparent(span_context: Any) -> str | None: + if not getattr(span_context, "is_valid", False): + return None + + trace_flags = int(getattr(span_context, "trace_flags", 0)) + return ( + f"00-{span_context.trace_id:032x}-{span_context.span_id:016x}-{trace_flags:02x}" + ) diff --git a/libs/policyengine-fastapi/src/policyengine_fastapi/observability/provider.py b/libs/policyengine-fastapi/src/policyengine_fastapi/observability/provider.py index 69ac0fef5..2f95456d0 100644 --- a/libs/policyengine-fastapi/src/policyengine_fastapi/observability/provider.py +++ b/libs/policyengine-fastapi/src/policyengine_fastapi/observability/provider.py @@ -1,25 +1,175 @@ from __future__ import annotations +import logging +import socket + +from opentelemetry.sdk.resources import ( + DEPLOYMENT_ENVIRONMENT, + SERVICE_INSTANCE_ID, + SERVICE_NAME, + SERVICE_NAMESPACE, + Resource, +) + from .config import ObservabilityConfig -from .emitters import NoOpObservability, Observability +from .emitters import ( + JsonPayloadFormatter, + NoOpObservability, + Observability, + OtlpObservability, +) + +_CACHE: dict[tuple, Observability] = {} + + +def _cache_key(config: ObservabilityConfig) -> tuple: + return ( + config.enabled, + config.shadow_mode, + config.service_name, + config.environment, + config.otlp_endpoint, + tuple(sorted(config.otlp_headers.items())), + config.artifact_bucket, + config.artifact_prefix, + config.tracer_capture_mode.value, + config.slow_run_threshold_seconds, + ) + + +def _build_resource(config: ObservabilityConfig) -> Resource: + return Resource.create( + { + SERVICE_NAME: config.service_name, + SERVICE_NAMESPACE: "policyengine", + DEPLOYMENT_ENVIRONMENT: config.environment, + SERVICE_INSTANCE_ID: socket.gethostname(), + } + ) + + +def _build_real_observability(config: ObservabilityConfig) -> Observability: + from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import ( + PeriodicExportingMetricReader, + ) + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + + resource = _build_resource(config) + + tracer_provider = TracerProvider(resource=resource) + meter_provider = MeterProvider(resource=resource) + logger_provider = None + logging_handler = None + + if config.otlp_endpoint: + headers = config.otlp_headers or None + span_exporter = OTLPSpanExporter( + endpoint=f"{config.otlp_endpoint.rstrip('/')}/v1/traces", + headers=headers, + ) + tracer_provider.add_span_processor(BatchSpanProcessor(span_exporter)) + + metric_exporter = OTLPMetricExporter( + endpoint=f"{config.otlp_endpoint.rstrip('/')}/v1/metrics", + headers=headers, + ) + meter_provider = MeterProvider( + resource=resource, + metric_readers=[PeriodicExportingMetricReader(metric_exporter)], + ) + + from opentelemetry.exporter.otlp.proto.http._log_exporter import ( + OTLPLogExporter, + ) + + logger_provider = LoggerProvider(resource=resource) + logger_provider.add_log_record_processor( + BatchLogRecordProcessor( + OTLPLogExporter( + endpoint=f"{config.otlp_endpoint.rstrip('/')}/v1/logs", + headers=headers, + ) + ) + ) + logging_handler = LoggingHandler( + level=logging.INFO, + logger_provider=logger_provider, + ) + + tracer = tracer_provider.get_tracer(config.service_name) + meter = meter_provider.get_meter(config.service_name) + + lifecycle_logger = logging.getLogger( + f"policyengine.observability.{config.service_name}" + ) + lifecycle_logger.setLevel(logging.INFO) + lifecycle_logger.propagate = False + + lifecycle_logger.handlers = [] + stream_handler = logging.StreamHandler() + formatter = JsonPayloadFormatter() + stream_handler.setFormatter(formatter) + lifecycle_logger.addHandler(stream_handler) + if logging_handler is not None: + logging_handler.setFormatter(formatter) + lifecycle_logger.addHandler(logging_handler) + + return OtlpObservability( + config=config, + tracer=tracer, + meter=meter, + lifecycle_logger=lifecycle_logger, + tracer_provider=tracer_provider, + meter_provider=meter_provider, + logger_provider=logger_provider, + ) def build_observability( config: ObservabilityConfig | None = None, ) -> Observability: - """Central construction point for observability emitters. - - Commit 1 intentionally returns a no-op implementation even when enabled. - Later commits can swap in a real backend here without changing callers. - """ - if config is None: config = ObservabilityConfig.disabled() - return NoOpObservability(config=config) + if not config.enabled: + return NoOpObservability(config=config) + + key = _cache_key(config) + cached = _CACHE.get(key) + if cached is not None: + return cached + + built = _build_real_observability(config) + _CACHE[key] = built + return built def get_observability( config: ObservabilityConfig | None = None, ) -> Observability: return build_observability(config) + + +def reset_observability_cache() -> None: + for value in _CACHE.values(): + value.flush() + tracer_provider = getattr(value, "tracer_provider", None) + if tracer_provider is not None: + tracer_provider.shutdown() + meter_provider = getattr(value, "meter_provider", None) + if meter_provider is not None: + meter_provider.shutdown() + logger_provider = getattr(value, "logger_provider", None) + if logger_provider is not None: + logger_provider.shutdown() + _CACHE.clear() diff --git a/libs/policyengine-fastapi/tests/test_observability.py b/libs/policyengine-fastapi/tests/test_observability.py index 9323ecdc2..ae2141a44 100644 --- a/libs/policyengine-fastapi/tests/test_observability.py +++ b/libs/policyengine-fastapi/tests/test_observability.py @@ -1,8 +1,12 @@ from datetime import datetime, UTC +from opentelemetry.sdk._logs.export import LogExportResult +from opentelemetry.sdk.metrics.export import MetricExportResult +from opentelemetry.sdk.trace.export import SpanExportResult from policyengine_fastapi.observability import ( NoOpObservability, ObservabilityConfig, + OtlpObservability, SimulationCompositeTraceResponse, SimulationLifecycleEvent, SimulationRunSummary, @@ -15,7 +19,9 @@ build_observability, generate_run_id, get_observability, + parse_bool, parse_header_value_pairs, + reset_observability_cache, stable_config_hash, ) from pydantic import ValidationError @@ -39,6 +45,13 @@ def test_parse_header_value_pairs__raises_for_invalid_pair(): raise AssertionError("Expected invalid OTLP header parsing to fail") +def test_parse_bool__supports_common_truthy_and_falsey_values(): + assert parse_bool("true") is True + assert parse_bool("YES") is True + assert parse_bool("0") is False + assert parse_bool(None, default=True) is True + + def test_observability_config_disabled__returns_disabled_defaults(): config = ObservabilityConfig.disabled(service_name="test-service") @@ -169,17 +182,86 @@ def test_observability_provider__returns_noop_for_disabled_defaults(): assert observability.config == ObservabilityConfig.disabled() -def test_observability_provider__preserves_supplied_config(): +def test_observability_provider__preserves_supplied_config(monkeypatch): + reset_observability_cache() + monkeypatch.setattr( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export", + lambda self, spans: SpanExportResult.SUCCESS, + ) + monkeypatch.setattr( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter.export", + lambda self, metrics_data, timeout_millis=10000: MetricExportResult.SUCCESS, + ) + monkeypatch.setattr( + "opentelemetry.exporter.otlp.proto.http._log_exporter.OTLPLogExporter.export", + lambda self, batch: LogExportResult.SUCCESS, + ) config = ObservabilityConfig( enabled=True, service_name="simulation-worker", + otlp_endpoint="https://otlp.example", tracer_capture_mode=TracerCaptureMode.THRESHOLD, ) built = build_observability(config) fetched = get_observability(config) - assert isinstance(built, NoOpObservability) + assert isinstance(built, OtlpObservability) assert built.config == config - assert isinstance(fetched, NoOpObservability) - assert fetched.config == config + assert isinstance(fetched, OtlpObservability) + assert fetched is built + built.emit_lifecycle_event( + SimulationLifecycleEvent( + event_name="simulation.stage.completed", + stage=SimulationStage.WORKER_COMPLETED, + status="ok", + timestamp=datetime(2026, 4, 9, 20, 0, tzinfo=UTC), + service="policyengine-simulation-worker", + run_id="run-456", + ) + ) + built.emit_counter( + "policyengine.simulation.run.count", + attributes={"status": "submitted"}, + ) + built.emit_histogram( + "policyengine.simulation.run.duration.seconds", + 1.23, + attributes={"status": "complete"}, + ) + with built.span("run_simulation", attributes={"run_id": "run-456"}) as span: + span.add_event("simulation.completed") + assert span.get_traceparent() is not None + built.flush() + reset_observability_cache() + + +def test_otlp_observability__accepts_parent_traceparent(monkeypatch): + reset_observability_cache() + monkeypatch.setattr( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export", + lambda self, spans: SpanExportResult.SUCCESS, + ) + monkeypatch.setattr( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter.export", + lambda self, metrics_data, timeout_millis=10000: MetricExportResult.SUCCESS, + ) + monkeypatch.setattr( + "opentelemetry.exporter.otlp.proto.http._log_exporter.OTLPLogExporter.export", + lambda self, batch: LogExportResult.SUCCESS, + ) + config = ObservabilityConfig( + enabled=True, + service_name="simulation-worker", + otlp_endpoint="https://otlp.example", + ) + + built = build_observability(config) + + with built.span( + "run_simulation", + parent_traceparent="00-11111111111111111111111111111111-2222222222222222-01", + ) as span: + assert span.get_traceparent() is not None + + reset_observability_cache() diff --git a/libs/policyengine-fastapi/uv.lock b/libs/policyengine-fastapi/uv.lock index 189874abd..640276ae8 100644 --- a/libs/policyengine-fastapi/uv.lock +++ b/libs/policyengine-fastapi/uv.lock @@ -479,16 +479,44 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/6b/fd/5390ec4f49100f3ecb9968a392f9e6d039f1e3fe0ecd28443716ff01e589/fastar-0.11.0-cp314-cp314t-win_arm64.whl", hash = "sha256:76c1359314355eafbc6989f20fb1ad565a3d10200117923b9da765a17e2f6f11", size = 461049, upload-time = "2026-04-13T17:11:25.918Z" }, ] +[[package]] +name = "google-api-core" +version = "2.25.2" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version >= '3.14'", +] +dependencies = [ + { name = "google-auth", marker = "python_full_version >= '3.14'" }, + { name = "googleapis-common-protos", marker = "python_full_version >= '3.14'" }, + { name = "proto-plus", marker = "python_full_version >= '3.14'" }, + { name = "protobuf", marker = "python_full_version >= '3.14'" }, + { name = "requests", marker = "python_full_version >= '3.14'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/09/cd/63f1557235c2440fe0577acdbc32577c5c002684c58c7f4d770a92366a24/google_api_core-2.25.2.tar.gz", hash = "sha256:1c63aa6af0d0d5e37966f157a77f9396d820fba59f9e43e9415bc3dc5baff300", size = 166266, upload-time = "2025-10-03T00:07:34.778Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c8/d8/894716a5423933f5c8d2d5f04b16f052a515f78e815dab0c2c6f1fd105dc/google_api_core-2.25.2-py3-none-any.whl", hash = "sha256:e9a8f62d363dc8424a8497f4c2a47d6bcda6c16514c935629c257ab5d10210e7", size = 162489, upload-time = "2025-10-03T00:07:32.924Z" }, +] + +[package.optional-dependencies] +grpc = [ + { name = "grpcio", marker = "python_full_version >= '3.14'" }, + { name = "grpcio-status", marker = "python_full_version >= '3.14'" }, +] + [[package]] name = "google-api-core" version = "2.30.3" source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version < '3.14'", +] dependencies = [ - { name = "google-auth" }, - { name = "googleapis-common-protos" }, - { name = "proto-plus" }, - { name = "protobuf" }, - { name = "requests" }, + { name = "google-auth", marker = "python_full_version < '3.14'" }, + { name = "googleapis-common-protos", marker = "python_full_version < '3.14'" }, + { name = "proto-plus", marker = "python_full_version < '3.14'" }, + { name = "protobuf", marker = "python_full_version < '3.14'" }, + { name = "requests", marker = "python_full_version < '3.14'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/16/ce/502a57fb0ec752026d24df1280b162294b22a0afb98a326084f9a979138b/google_api_core-2.30.3.tar.gz", hash = "sha256:e601a37f148585319b26db36e219df68c5d07b6382cff2d580e83404e44d641b", size = 177001, upload-time = "2026-04-10T00:41:28.035Z" } wheels = [ @@ -497,8 +525,8 @@ wheels = [ [package.optional-dependencies] grpc = [ - { name = "grpcio" }, - { name = "grpcio-status" }, + { name = "grpcio", marker = "python_full_version < '3.14'" }, + { name = "grpcio-status", marker = "python_full_version < '3.14'" }, ] [[package]] @@ -519,7 +547,8 @@ name = "google-cloud-monitoring" version = "2.30.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "google-api-core", extra = ["grpc"] }, + { name = "google-api-core", version = "2.25.2", source = { registry = "https://pypi.org/simple" }, extra = ["grpc"], marker = "python_full_version >= '3.14'" }, + { name = "google-api-core", version = "2.30.3", source = { registry = "https://pypi.org/simple" }, extra = ["grpc"], marker = "python_full_version < '3.14'" }, { name = "google-auth" }, { name = "grpcio" }, { name = "proto-plus" }, @@ -535,7 +564,8 @@ name = "google-cloud-trace" version = "1.19.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "google-api-core", extra = ["grpc"] }, + { name = "google-api-core", version = "2.25.2", source = { registry = "https://pypi.org/simple" }, extra = ["grpc"], marker = "python_full_version >= '3.14'" }, + { name = "google-api-core", version = "2.30.3", source = { registry = "https://pypi.org/simple" }, extra = ["grpc"], marker = "python_full_version < '3.14'" }, { name = "google-auth" }, { name = "grpcio" }, { name = "proto-plus" }, @@ -567,7 +597,9 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7a/75/7e9cd1126a1e1f0cd67b0eda02e5221b28488d352684704a78ed505bd719/greenlet-3.4.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:43748988b097f9c6f09364f260741aa73c80747f63389824435c7a50bfdfd5c1", size = 285856, upload-time = "2026-04-08T15:52:45.82Z" }, { url = "https://files.pythonhosted.org/packages/9d/c4/3e2df392e5cb199527c4d9dbcaa75c14edcc394b45040f0189f649631e3c/greenlet-3.4.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:5566e4e2cd7a880e8c27618e3eab20f3494452d12fd5129edef7b2f7aa9a36d1", size = 610208, upload-time = "2026-04-08T16:24:39.674Z" }, { url = "https://files.pythonhosted.org/packages/da/af/750cdfda1d1bd30a6c28080245be8d0346e669a98fdbae7f4102aa95fff3/greenlet-3.4.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:1054c5a3c78e2ab599d452f23f7adafef55062a783a8e241d24f3b633ba6ff82", size = 621269, upload-time = "2026-04-08T16:30:59.767Z" }, + { url = "https://files.pythonhosted.org/packages/e0/93/c8c508d68ba93232784bbc1b5474d92371f2897dfc6bc281b419f2e0d492/greenlet-3.4.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:98eedd1803353daf1cd9ef23eef23eda5a4d22f99b1f998d273a8b78b70dd47f", size = 628455, upload-time = "2026-04-08T16:40:40.698Z" }, { url = "https://files.pythonhosted.org/packages/54/78/0cbc693622cd54ebe25207efbb3a0eb07c2639cb8594f6e3aaaa0bb077a8/greenlet-3.4.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f82cb6cddc27dd81c96b1506f4aa7def15070c3b2a67d4e46fd19016aacce6cf", size = 617549, upload-time = "2026-04-08T15:56:34.893Z" }, + { url = "https://files.pythonhosted.org/packages/7f/46/cfaaa0ade435a60550fd83d07dfd5c41f873a01da17ede5c4cade0b9bab8/greenlet-3.4.0-cp313-cp313-manylinux_2_39_riscv64.whl", hash = "sha256:b7857e2202aae67bc5725e0c1f6403c20a8ff46094ece015e7d474f5f7020b55", size = 426238, upload-time = "2026-04-08T16:43:06.865Z" }, { url = "https://files.pythonhosted.org/packages/ba/c0/8966767de01343c1ff47e8b855dc78e7d1a8ed2b7b9c83576a57e289f81d/greenlet-3.4.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:227a46251ecba4ff46ae742bc5ce95c91d5aceb4b02f885487aff269c127a729", size = 1575310, upload-time = "2026-04-08T16:26:21.671Z" }, { url = "https://files.pythonhosted.org/packages/b8/38/bcdc71ba05e9a5fda87f63ffc2abcd1f15693b659346df994a48c968003d/greenlet-3.4.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5b99e87be7eba788dd5b75ba1cde5639edffdec5f91fe0d734a249535ec3408c", size = 1640435, upload-time = "2026-04-08T15:57:32.572Z" }, { url = "https://files.pythonhosted.org/packages/a1/c2/19b664b7173b9e4ef5f77e8cef9f14c20ec7fce7920dc1ccd7afd955d093/greenlet-3.4.0-cp313-cp313-win_amd64.whl", hash = "sha256:849f8bc17acd6295fcb5de8e46d55cc0e52381c56eaf50a2afd258e97bc65940", size = 238760, upload-time = "2026-04-08T17:04:03.878Z" }, @@ -575,7 +607,9 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/78/02/bde66806e8f169cf90b14d02c500c44cdbe02c8e224c9c67bafd1b8cadd1/greenlet-3.4.0-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:10a07aca6babdd18c16a3f4f8880acfffc2b88dfe431ad6aa5f5740759d7d75e", size = 286291, upload-time = "2026-04-08T17:09:34.307Z" }, { url = "https://files.pythonhosted.org/packages/05/1f/39da1c336a87d47c58352fb8a78541ce63d63ae57c5b9dae1fe02801bbc2/greenlet-3.4.0-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:076e21040b3a917d3ce4ad68fb5c3c6b32f1405616c4a57aa83120979649bd3d", size = 656749, upload-time = "2026-04-08T16:24:41.721Z" }, { url = "https://files.pythonhosted.org/packages/d3/6c/90ee29a4ee27af7aa2e2ec408799eeb69ee3fcc5abcecac6ddd07a5cd0f2/greenlet-3.4.0-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e82689eea4a237e530bb5cb41b180ef81fa2160e1f89422a67be7d90da67f615", size = 669084, upload-time = "2026-04-08T16:31:01.372Z" }, + { url = "https://files.pythonhosted.org/packages/d2/4a/74078d3936712cff6d3c91a930016f476ce4198d84e224fe6d81d3e02880/greenlet-3.4.0-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:06c2d3b89e0c62ba50bd7adf491b14f39da9e7e701647cb7b9ff4c99bee04b19", size = 673405, upload-time = "2026-04-08T16:40:42.527Z" }, { url = "https://files.pythonhosted.org/packages/07/49/d4cad6e5381a50947bb973d2f6cf6592621451b09368b8c20d9b8af49c5b/greenlet-3.4.0-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4df3b0b2289ec686d3c821a5fee44259c05cfe824dd5e6e12c8e5f5df23085cf", size = 665621, upload-time = "2026-04-08T15:56:35.995Z" }, + { url = "https://files.pythonhosted.org/packages/79/3e/df8a83ab894751bc31e1106fdfaa80ca9753222f106b04de93faaa55feb7/greenlet-3.4.0-cp314-cp314-manylinux_2_39_riscv64.whl", hash = "sha256:070b8bac2ff3b4d9e0ff36a0d19e42103331d9737e8504747cd1e659f76297bd", size = 471670, upload-time = "2026-04-08T16:43:08.512Z" }, { url = "https://files.pythonhosted.org/packages/37/31/d1edd54f424761b5d47718822f506b435b6aab2f3f93b465441143ea5119/greenlet-3.4.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:8bff29d586ea415688f4cec96a591fcc3bf762d046a796cdadc1fdb6e7f2d5bf", size = 1622259, upload-time = "2026-04-08T16:26:23.201Z" }, { url = "https://files.pythonhosted.org/packages/b0/c6/6d3f9cdcb21c4e12a79cb332579f1c6aa1af78eb68059c5a957c7812d95e/greenlet-3.4.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:8a569c2fb840c53c13a2b8967c63621fafbd1a0e015b9c82f408c33d626a2fda", size = 1686916, upload-time = "2026-04-08T15:57:34.282Z" }, { url = "https://files.pythonhosted.org/packages/63/45/c1ca4a1ad975de4727e52d3ffe641ae23e1d7a8ffaa8ff7a0477e1827b92/greenlet-3.4.0-cp314-cp314-win_amd64.whl", hash = "sha256:207ba5b97ea8b0b60eb43ffcacf26969dd83726095161d676aac03ff913ee50d", size = 239821, upload-time = "2026-04-08T17:03:48.423Z" }, @@ -583,7 +617,9 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d4/8f/18d72b629783f5e8d045a76f5325c1e938e659a9e4da79c7dcd10169a48d/greenlet-3.4.0-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:d70012e51df2dbbccfaf63a40aaf9b40c8bed37c3e3a38751c926301ce538ece", size = 294681, upload-time = "2026-04-08T15:52:35.778Z" }, { url = "https://files.pythonhosted.org/packages/9e/ad/5fa86ec46769c4153820d58a04062285b3b9e10ba3d461ee257b68dcbf53/greenlet-3.4.0-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a58bec0751f43068cd40cff31bb3ca02ad6000b3a51ca81367af4eb5abc480c8", size = 658899, upload-time = "2026-04-08T16:24:43.32Z" }, { url = "https://files.pythonhosted.org/packages/43/f0/4e8174ca0e87ae748c409f055a1ba161038c43cc0a5a6f1433a26ac2e5bf/greenlet-3.4.0-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:05fa0803561028f4b2e3b490ee41216a842eaee11aed004cc343a996d9523aa2", size = 665284, upload-time = "2026-04-08T16:31:02.833Z" }, + { url = "https://files.pythonhosted.org/packages/ef/92/466b0d9afd44b8af623139a3599d651c7564fa4152f25f117e1ee5949ffb/greenlet-3.4.0-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:c4cd56a9eb7a6444edbc19062f7b6fbc8f287c663b946e3171d899693b1c19fa", size = 665872, upload-time = "2026-04-08T16:40:43.912Z" }, { url = "https://files.pythonhosted.org/packages/19/da/991cf7cd33662e2df92a1274b7eb4d61769294d38a1bba8a45f31364845e/greenlet-3.4.0-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e60d38719cb80b3ab5e85f9f1aed4960acfde09868af6762ccb27b260d68f4ed", size = 661861, upload-time = "2026-04-08T15:56:37.269Z" }, + { url = "https://files.pythonhosted.org/packages/0d/14/3395a7ef3e260de0325152ddfe19dffb3e49fe10873b94654352b53ad48e/greenlet-3.4.0-cp314-cp314t-manylinux_2_39_riscv64.whl", hash = "sha256:1f85f204c4d54134ae850d401fa435c89cd667d5ce9dc567571776b45941af72", size = 489237, upload-time = "2026-04-08T16:43:09.993Z" }, { url = "https://files.pythonhosted.org/packages/36/c5/6c2c708e14db3d9caea4b459d8464f58c32047451142fe2cfd90e7458f41/greenlet-3.4.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:7f50c804733b43eded05ae694691c9aa68bca7d0a867d67d4a3f514742a2d53f", size = 1622182, upload-time = "2026-04-08T16:26:24.777Z" }, { url = "https://files.pythonhosted.org/packages/7a/4c/50c5fed19378e11a29fabab1f6be39ea95358f4a0a07e115a51ca93385d8/greenlet-3.4.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:2d4f0635dc4aa638cda4b2f5a07ae9a2cff9280327b581a3fcb6f317b4fbc38a", size = 1685050, upload-time = "2026-04-08T15:57:36.453Z" }, { url = "https://files.pythonhosted.org/packages/db/72/85ae954d734703ab48e622c59d4ce35d77ce840c265814af9c078cacc7aa/greenlet-3.4.0-cp314-cp314t-win_amd64.whl", hash = "sha256:1a4a48f24681300c640f143ba7c404270e1ebbbcf34331d7104a4ff40f8ea705", size = 245554, upload-time = "2026-04-08T17:03:50.044Z" }, @@ -622,16 +658,16 @@ wheels = [ [[package]] name = "grpcio-status" -version = "1.80.0" +version = "1.71.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "googleapis-common-protos" }, { name = "grpcio" }, { name = "protobuf" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/b1/ed/105f619bdd00cb47a49aa2feea6232ea2bbb04199d52a22cc6a7d603b5cb/grpcio_status-1.80.0.tar.gz", hash = "sha256:df73802a4c89a3ea88aa2aff971e886fccce162bc2e6511408b3d67a144381cd", size = 13901, upload-time = "2026-03-30T08:54:34.784Z" } +sdist = { url = "https://files.pythonhosted.org/packages/fd/d1/b6e9877fedae3add1afdeae1f89d1927d296da9cf977eca0eb08fb8a460e/grpcio_status-1.71.2.tar.gz", hash = "sha256:c7a97e176df71cdc2c179cd1847d7fc86cca5832ad12e9798d7fed6b7a1aab50", size = 13677, upload-time = "2025-06-28T04:24:05.426Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/76/80/58cd2dfc19a07d022abe44bde7c365627f6c7cb6f692ada6c65ca437d09a/grpcio_status-1.80.0-py3-none-any.whl", hash = "sha256:4b56990363af50dbf2c2ebb80f1967185c07d87aa25aa2bea45ddb75fc181dbe", size = 14638, upload-time = "2026-03-30T08:54:01.569Z" }, + { url = "https://files.pythonhosted.org/packages/67/58/317b0134129b556a93a3b0afe00ee675b5657f0155509e22fcb853bafe2d/grpcio_status-1.71.2-py3-none-any.whl", hash = "sha256:803c98cb6a8b7dc6dbb785b1111aed739f241ab5e9da0bba96888aa74704cfd3", size = 14424, upload-time = "2025-06-28T04:23:42.136Z" }, ] [[package]] @@ -869,6 +905,36 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5f/4a/876703e8c5845198d95cd4006c8d1b2e3b129a9e288558e33133360f8d5d/opentelemetry_exporter_gcp_trace-1.11.0-py3-none-any.whl", hash = "sha256:b3dcb314e1a9985e9185cb7720b693eb393886fde98ae4c095ffc0893de6cefa", size = 14016, upload-time = "2025-11-04T19:32:09.009Z" }, ] +[[package]] +name = "opentelemetry-exporter-otlp-proto-common" +version = "1.30.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-proto" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a2/d7/44098bf1ef89fc5810cdbda05faa2ae9322a0dbda4921cdc965dc68a9856/opentelemetry_exporter_otlp_proto_common-1.30.0.tar.gz", hash = "sha256:ddbfbf797e518411857d0ca062c957080279320d6235a279f7b64ced73c13897", size = 19640, upload-time = "2025-02-04T18:17:16.234Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ee/54/f4b3de49f8d7d3a78fd6e6e1a6fd27dd342eb4d82c088b9078c6a32c3808/opentelemetry_exporter_otlp_proto_common-1.30.0-py3-none-any.whl", hash = "sha256:5468007c81aa9c44dc961ab2cf368a29d3475977df83b4e30aeed42aa7bc3b38", size = 18747, upload-time = "2025-02-04T18:16:51.512Z" }, +] + +[[package]] +name = "opentelemetry-exporter-otlp-proto-http" +version = "1.30.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "deprecated" }, + { name = "googleapis-common-protos" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/04/f9/abb9191d536e6a2e2b7903f8053bf859a76bf784e3ca19a5749550ef19e4/opentelemetry_exporter_otlp_proto_http-1.30.0.tar.gz", hash = "sha256:c3ae75d4181b1e34a60662a6814d0b94dd33b628bee5588a878bed92cee6abdc", size = 15073, upload-time = "2025-02-04T18:17:18.446Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e1/3c/cdf34bc459613f2275aff9b258f35acdc4c4938dad161d17437de5d4c034/opentelemetry_exporter_otlp_proto_http-1.30.0-py3-none-any.whl", hash = "sha256:9578e790e579931c5ffd50f1e6975cbdefb6a0a0a5dea127a6ae87df10e0a589", size = 17245, upload-time = "2025-02-04T18:16:53.514Z" }, +] + [[package]] name = "opentelemetry-instrumentation" version = "0.51b0" @@ -945,6 +1011,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/12/d4/b68c3b3388dd5107f3ed532747e112249c152ba44af71a1f96673d66e3ee/opentelemetry_instrumentation_sqlalchemy-0.51b0-py3-none-any.whl", hash = "sha256:5ff4816228b496aef1511149e2b17a25e0faacec4d5eb65bf18a9964af40f1af", size = 14132, upload-time = "2025-02-04T18:20:50.513Z" }, ] +[[package]] +name = "opentelemetry-proto" +version = "1.30.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/31/6e/c1ff2e3b0cd3a189a6be03fd4d63441d73d7addd9117ab5454e667b9b6c7/opentelemetry_proto-1.30.0.tar.gz", hash = "sha256:afe5c9c15e8b68d7c469596e5b32e8fc085eb9febdd6fb4e20924a93a0389179", size = 34362, upload-time = "2025-02-04T18:17:28.099Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/56/d7/85de6501f7216995295f7ec11e470142e6a6e080baacec1753bbf272e007/opentelemetry_proto-1.30.0-py3-none-any.whl", hash = "sha256:c6290958ff3ddacc826ca5abbeb377a31c2334387352a259ba0df37c243adc11", size = 55854, upload-time = "2025-02-04T18:17:08.024Z" }, +] + [[package]] name = "opentelemetry-resourcedetector-gcp" version = "1.11.0a0" @@ -1040,6 +1118,7 @@ dependencies = [ { name = "fastapi", extra = ["standard"] }, { name = "opentelemetry-exporter-gcp-monitoring" }, { name = "opentelemetry-exporter-gcp-trace" }, + { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-instrumentation-fastapi" }, { name = "opentelemetry-instrumentation-logging" }, { name = "opentelemetry-instrumentation-sqlalchemy" }, @@ -1067,6 +1146,7 @@ requires-dist = [ { name = "fastapi", extras = ["standard"], specifier = ">=0.115.8,<0.116.0" }, { name = "opentelemetry-exporter-gcp-monitoring", specifier = ">=1.9.0a0,<2.0.0" }, { name = "opentelemetry-exporter-gcp-trace", specifier = ">=1.9.0,<2.0.0" }, + { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.30.0,<2.0.0" }, { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.51b0,<0.52" }, { name = "opentelemetry-instrumentation-logging", specifier = ">=0.51b0,<0.52" }, { name = "opentelemetry-instrumentation-sqlalchemy", specifier = ">=0.51b0" }, @@ -1096,17 +1176,16 @@ wheels = [ [[package]] name = "protobuf" -version = "6.33.6" +version = "5.29.6" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/66/70/e908e9c5e52ef7c3a6c7902c9dfbb34c7e29c25d2f81ade3856445fd5c94/protobuf-6.33.6.tar.gz", hash = "sha256:a6768d25248312c297558af96a9f9c929e8c4cee0659cb07e780731095f38135", size = 444531, upload-time = "2026-03-18T19:05:00.988Z" } +sdist = { url = "https://files.pythonhosted.org/packages/7e/57/394a763c103e0edf87f0938dafcd918d53b4c011dfc5c8ae80f3b0452dbb/protobuf-5.29.6.tar.gz", hash = "sha256:da9ee6a5424b6b30fd5e45c5ea663aef540ca95f9ad99d1e887e819cdf9b8723", size = 425623, upload-time = "2026-02-04T22:54:40.584Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fc/9f/2f509339e89cfa6f6a4c4ff50438db9ca488dec341f7e454adad60150b00/protobuf-6.33.6-cp310-abi3-win32.whl", hash = "sha256:7d29d9b65f8afef196f8334e80d6bc1d5d4adedb449971fefd3723824e6e77d3", size = 425739, upload-time = "2026-03-18T19:04:48.373Z" }, - { url = "https://files.pythonhosted.org/packages/76/5d/683efcd4798e0030c1bab27374fd13a89f7c2515fb1f3123efdfaa5eab57/protobuf-6.33.6-cp310-abi3-win_amd64.whl", hash = "sha256:0cd27b587afca21b7cfa59a74dcbd48a50f0a6400cfb59391340ad729d91d326", size = 437089, upload-time = "2026-03-18T19:04:50.381Z" }, - { url = "https://files.pythonhosted.org/packages/5c/01/a3c3ed5cd186f39e7880f8303cc51385a198a81469d53d0fdecf1f64d929/protobuf-6.33.6-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:9720e6961b251bde64edfdab7d500725a2af5280f3f4c87e57c0208376aa8c3a", size = 427737, upload-time = "2026-03-18T19:04:51.866Z" }, - { url = "https://files.pythonhosted.org/packages/ee/90/b3c01fdec7d2f627b3a6884243ba328c1217ed2d978def5c12dc50d328a3/protobuf-6.33.6-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:e2afbae9b8e1825e3529f88d514754e094278bb95eadc0e199751cdd9a2e82a2", size = 324610, upload-time = "2026-03-18T19:04:53.096Z" }, - { url = "https://files.pythonhosted.org/packages/9b/ca/25afc144934014700c52e05103c2421997482d561f3101ff352e1292fb81/protobuf-6.33.6-cp39-abi3-manylinux2014_s390x.whl", hash = "sha256:c96c37eec15086b79762ed265d59ab204dabc53056e3443e702d2681f4b39ce3", size = 339381, upload-time = "2026-03-18T19:04:54.616Z" }, - { url = "https://files.pythonhosted.org/packages/16/92/d1e32e3e0d894fe00b15ce28ad4944ab692713f2e7f0a99787405e43533a/protobuf-6.33.6-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:e9db7e292e0ab79dd108d7f1a94fe31601ce1ee3f7b79e0692043423020b0593", size = 323436, upload-time = "2026-03-18T19:04:55.768Z" }, - { url = "https://files.pythonhosted.org/packages/c4/72/02445137af02769918a93807b2b7890047c32bfb9f90371cbc12688819eb/protobuf-6.33.6-py3-none-any.whl", hash = "sha256:77179e006c476e69bf8e8ce866640091ec42e1beb80b213c3900006ecfba6901", size = 170656, upload-time = "2026-03-18T19:04:59.826Z" }, + { url = "https://files.pythonhosted.org/packages/d4/88/9ee58ff7863c479d6f8346686d4636dd4c415b0cbeed7a6a7d0617639c2a/protobuf-5.29.6-cp310-abi3-win32.whl", hash = "sha256:62e8a3114992c7c647bce37dcc93647575fc52d50e48de30c6fcb28a6a291eb1", size = 423357, upload-time = "2026-02-04T22:54:25.805Z" }, + { url = "https://files.pythonhosted.org/packages/1c/66/2dc736a4d576847134fb6d80bd995c569b13cdc7b815d669050bf0ce2d2c/protobuf-5.29.6-cp310-abi3-win_amd64.whl", hash = "sha256:7e6ad413275be172f67fdee0f43484b6de5a904cc1c3ea9804cb6fe2ff366eda", size = 435175, upload-time = "2026-02-04T22:54:28.592Z" }, + { url = "https://files.pythonhosted.org/packages/06/db/49b05966fd208ae3f44dcd33837b6243b4915c57561d730a43f881f24dea/protobuf-5.29.6-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:b5a169e664b4057183a34bdc424540e86eea47560f3c123a0d64de4e137f9269", size = 418619, upload-time = "2026-02-04T22:54:30.266Z" }, + { url = "https://files.pythonhosted.org/packages/b7/d7/48cbf6b0c3c39761e47a99cb483405f0fde2be22cf00d71ef316ce52b458/protobuf-5.29.6-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:a8866b2cff111f0f863c1b3b9e7572dc7eaea23a7fae27f6fc613304046483e6", size = 320284, upload-time = "2026-02-04T22:54:31.782Z" }, + { url = "https://files.pythonhosted.org/packages/e3/dd/cadd6ec43069247d91f6345fa7a0d2858bef6af366dbd7ba8f05d2c77d3b/protobuf-5.29.6-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:e3387f44798ac1106af0233c04fb8abf543772ff241169946f698b3a9a3d3ab9", size = 320478, upload-time = "2026-02-04T22:54:32.909Z" }, + { url = "https://files.pythonhosted.org/packages/5a/cb/e3065b447186cb70aa65acc70c86baf482d82bf75625bf5a2c4f6919c6a3/protobuf-5.29.6-py3-none-any.whl", hash = "sha256:6b9edb641441b2da9fa8f428760fc136a49cf97a52076010cf22a2ff73438a86", size = 173126, upload-time = "2026-02-04T22:54:39.462Z" }, ] [[package]] diff --git a/projects/policyengine-api-simulation/fixtures/gateway/test_endpoints.py b/projects/policyengine-api-simulation/fixtures/gateway/test_endpoints.py index dcfce57d6..b507fbc32 100644 --- a/projects/policyengine-api-simulation/fixtures/gateway/test_endpoints.py +++ b/projects/policyengine-api-simulation/fixtures/gateway/test_endpoints.py @@ -29,40 +29,39 @@ def from_name(cls, name: str): class MockFunctionCall: """Mock for Modal FunctionCall returned by spawn.""" - registry = {} - - def __init__(self, object_id: str = "mock-job-id-123"): + def __init__( + self, + object_id: str = "mock-job-id-123", + result: dict | None = None, + error: Exception | None = None, + is_running: bool = False, + ): self.object_id = object_id - self.result = {"budget": {"total": 1000000}} - self.error = None - self.running = False - self.__class__.registry[object_id] = self + self._result = {"budget": {"total": 1000000}} if result is None else result + self._error = error + self._is_running = is_running def get(self, timeout: int = 0): - if self.running: + if self._is_running: raise TimeoutError() - if self.error is not None: - raise self.error - return self.result - - @classmethod - def from_id(cls, object_id: str): - if object_id not in cls.registry: - raise KeyError(object_id) - return cls.registry[object_id] + if self._error is not None: + raise self._error + return self._result class MockFunction: """Mock for Modal Function.""" - def __init__(self): + def __init__(self, function_calls: dict[str, MockFunctionCall]): self.last_payload = None self.last_from_name_call = None - self.last_call = None + self.last_call = MockFunctionCall() + self._function_calls = function_calls def spawn(self, payload: dict) -> MockFunctionCall: self.last_payload = payload self.last_call = MockFunctionCall() + self._function_calls[self.last_call.object_id] = self.last_call return self.last_call @classmethod @@ -76,17 +75,18 @@ def mock_modal(monkeypatch): """Patch Modal calls in the gateway endpoints module.""" from src.modal.gateway import endpoints - mock_func = MockFunction() mock_dicts = {} - MockFunctionCall.registry = {} + function_calls = {} + mock_func = MockFunction(function_calls) class MockModalDict: @staticmethod def from_name(name: str, create_if_missing: bool = False): - if create_if_missing and name not in mock_dicts: - mock_dicts[name] = {} if name not in mock_dicts: - raise KeyError(f"Mock dict not configured for: {name}") + if create_if_missing: + mock_dicts[name] = {} + else: + raise KeyError(f"Mock dict not configured for: {name}") return MockDict(mock_dicts[name]) class MockModalFunction: @@ -95,14 +95,22 @@ def from_name(app_name: str, func_name: str): mock_func.last_from_name_call = (app_name, func_name) return mock_func + class MockModalFunctionCall: + @staticmethod + def from_id(job_id: str): + if job_id not in function_calls: + raise KeyError(job_id) + return function_calls[job_id] + class MockModal: Dict = MockModalDict Function = MockModalFunction - FunctionCall = MockFunctionCall + FunctionCall = MockModalFunctionCall monkeypatch.setattr(endpoints, "modal", MockModal) return { "func": mock_func, "dicts": mock_dicts, + "function_calls": function_calls, } diff --git a/projects/policyengine-api-simulation/src/modal/app.py b/projects/policyengine-api-simulation/src/modal/app.py index 51463ac6a..930873d2c 100644 --- a/projects/policyengine-api-simulation/src/modal/app.py +++ b/projects/policyengine-api-simulation/src/modal/app.py @@ -9,8 +9,19 @@ import modal import os +from time import perf_counter from src.modal._image_setup import snapshot_models +from src.modal.observability import ( + build_lifecycle_event, + build_metric_attributes, + build_span_attributes, + QUEUE_DURATION_METRIC_NAME, + FAILURE_COUNT_METRIC_NAME, + duration_since_requested_at, + get_observability, + parse_bool, +) # Get versions from environment or use defaults US_VERSION = os.environ.get("POLICYENGINE_US_VERSION", "1.562.3") @@ -40,6 +51,7 @@ def get_app_name(us_version: str, uk_version: str) -> str: gcp_secret = modal.Secret.from_name("gcp-credentials", environment_name="main") # Logfire secret is environment-specific logfire_secret = modal.Secret.from_name("policyengine-logfire") +observability_secret = modal.Secret.from_name("policyengine-observability") # Heavy image with model snapshot for simulation simulation_image = ( @@ -50,6 +62,8 @@ def get_app_name(us_version: str, uk_version: str) -> str: "policyengine==0.13.0", "tables>=3.10.2", "logfire", + "opentelemetry-sdk>=1.30.0,<2.0.0", + "opentelemetry-exporter-otlp-proto-http>=1.30.0,<2.0.0", ) .add_local_python_source("src.modal", copy=True) .run_function(snapshot_models) @@ -72,6 +86,18 @@ def configure_logfire(service_name: str = "policyengine-simulation"): ) +def should_use_logfire() -> bool: + token = os.environ.get("LOGFIRE_TOKEN", "") + if not token: + return False + if parse_bool(os.environ.get("OBSERVABILITY_ENABLED"), False) and not parse_bool( + os.environ.get("OBSERVABILITY_SHADOW_MODE"), + True, + ): + return False + return True + + @app.function( image=simulation_image, cpu=8.0, @@ -79,7 +105,7 @@ def configure_logfire(service_name: str = "policyengine-simulation"): timeout=3600, retries=0, max_containers=100, - secrets=[gcp_secret, logfire_secret], + secrets=[gcp_secret, logfire_secret, observability_secret], ) def run_simulation(params: dict) -> dict: """ @@ -92,15 +118,120 @@ def run_simulation(params: dict) -> dict: from src.modal.simulation import run_simulation_impl - configure_logfire() + observability = get_observability("policyengine-simulation-worker") + telemetry = dict(params.get("_telemetry") or {}) + telemetry.update( + { + "country": params.get("country"), + "country_package_version": ( + US_VERSION if params.get("country") == "us" else UK_VERSION + ), + "policyengine_version": "0.13.0", + "modal_app_name": APP_NAME, + } + ) + metric_attributes = build_metric_attributes( + telemetry, + service="policyengine-simulation-worker", + ) + start = perf_counter() + queue_duration = duration_since_requested_at(telemetry) + if queue_duration is not None: + observability.emit_histogram( + QUEUE_DURATION_METRIC_NAME, + queue_duration, + attributes=build_metric_attributes( + telemetry, + service="policyengine-simulation-worker", + stage="worker.started", + status="running", + ), + ) + + observability.emit_lifecycle_event( + build_lifecycle_event( + stage="worker.started", + status="running", + service="policyengine-simulation-worker", + telemetry=telemetry, + details={"queue_duration_seconds": queue_duration}, + ) + ) + + use_logfire = should_use_logfire() + if use_logfire: + configure_logfire() try: - with logfire.span( + with observability.span( "run_simulation", - input_params=params, - ) as span: - result = run_simulation_impl(params) - span.set_attribute("output_result", result) + build_span_attributes( + telemetry, + service="policyengine-simulation-worker", + ), + parent_traceparent=telemetry.get("traceparent"), + ) as otel_span: + otel_span.set_attribute("run_id", telemetry.get("run_id")) + if queue_duration is not None: + otel_span.set_attribute( + "queue_duration_seconds", + queue_duration, + ) + current_traceparent = otel_span.get_traceparent() + if current_traceparent: + telemetry["traceparent"] = current_traceparent + if use_logfire: + with logfire.span( + "run_simulation", + input_params=params, + ) as span: + result = run_simulation_impl(params, observability, telemetry) + span.set_attribute("output_result", result) + else: + result = run_simulation_impl(params, observability, telemetry) + + duration = perf_counter() - start + observability.emit_counter( + "policyengine.simulation.run.count", + attributes={**metric_attributes, "status": "complete"}, + ) + observability.emit_histogram( + "policyengine.simulation.run.duration.seconds", + duration, + attributes={**metric_attributes, "status": "complete"}, + ) + observability.emit_lifecycle_event( + build_lifecycle_event( + stage="worker.completed", + status="complete", + service="policyengine-simulation-worker", + telemetry=telemetry, + duration_seconds=duration, + ) + ) return result + except Exception as exc: + duration = perf_counter() - start + observability.emit_counter( + FAILURE_COUNT_METRIC_NAME, + attributes={ + **metric_attributes, + "stage": "result.failed", + "status": "failed", + }, + ) + observability.emit_lifecycle_event( + build_lifecycle_event( + stage="result.failed", + status="failed", + service="policyengine-simulation-worker", + telemetry=telemetry, + duration_seconds=duration, + details={"error": str(exc)}, + ) + ) + raise finally: - logfire.force_flush() + observability.flush() + if use_logfire: + logfire.force_flush() diff --git a/projects/policyengine-api-simulation/src/modal/gateway/app.py b/projects/policyengine-api-simulation/src/modal/gateway/app.py index e11c30212..00c447af1 100644 --- a/projects/policyengine-api-simulation/src/modal/gateway/app.py +++ b/projects/policyengine-api-simulation/src/modal/gateway/app.py @@ -12,6 +12,7 @@ # Stable app name - this should rarely change app = modal.App("policyengine-simulation-gateway") +observability_secret = modal.Secret.from_name("policyengine-observability") # Lightweight image for gateway - no heavy dependencies gateway_image = ( @@ -19,12 +20,14 @@ .pip_install( "fastapi>=0.115.0", "pydantic>=2.0", + "opentelemetry-sdk>=1.30.0,<2.0.0", + "opentelemetry-exporter-otlp-proto-http>=1.30.0,<2.0.0", ) .add_local_python_source("src.modal", copy=True) ) -@app.function(image=gateway_image) +@app.function(image=gateway_image, secrets=[observability_secret]) @modal.asgi_app() def web_app(): """ diff --git a/projects/policyengine-api-simulation/src/modal/gateway/endpoints.py b/projects/policyengine-api-simulation/src/modal/gateway/endpoints.py index a909425f7..535ec18e0 100644 --- a/projects/policyengine-api-simulation/src/modal/gateway/endpoints.py +++ b/projects/policyengine-api-simulation/src/modal/gateway/endpoints.py @@ -9,6 +9,7 @@ from fastapi import APIRouter, HTTPException from fastapi.responses import JSONResponse +from src.modal.gateway.observation import GatewayObservation from src.modal.gateway.models import ( JobStatusResponse, JobSubmitResponse, @@ -17,11 +18,16 @@ PolicyEngineBundle, SimulationRequest, ) +from src.modal.observability import ( + get_observability, +) logger = logging.getLogger(__name__) +observability = get_observability("policyengine-simulation-gateway") router = APIRouter() JOB_METADATA_DICT_NAME = "simulation-api-job-metadata" +JOB_TELEMETRY_DICT_NAME = "simulation-api-job-telemetry" DATASET_URIS = { "us": { "enhanced_cps": "hf://policyengine/policyengine-us-data/enhanced_cps_2024.h5@1.77.0", @@ -44,6 +50,10 @@ def _job_metadata_store(): return modal.Dict.from_name(JOB_METADATA_DICT_NAME, create_if_missing=True) +def get_job_telemetry_registry(): + return modal.Dict.from_name(JOB_TELEMETRY_DICT_NAME, create_if_missing=True) + + def _build_policyengine_bundle( country: str, resolved_version: str, payload: dict ) -> PolicyEngineBundle: @@ -72,6 +82,23 @@ def _serialize_job_metadata( } +def _build_status_metadata(job_id: str) -> tuple[dict, dict | None]: + job_metadata = dict(_job_metadata_store().get(job_id) or {}) + telemetry = None + try: + telemetry = dict(get_job_telemetry_registry().get(job_id) or {}) + except Exception: + telemetry = None + + if ( + "run_id" not in job_metadata + and telemetry is not None + and telemetry.get("run_id") is not None + ): + job_metadata["run_id"] = telemetry["run_id"] + return job_metadata, telemetry + + def get_app_name(country: str, version: Optional[str]) -> tuple[str, str]: """ Resolve country + version to Modal app name. @@ -85,13 +112,11 @@ def get_app_name(country: str, version: Optional[str]) -> tuple[str, str]: version_dict = modal.Dict.from_name(f"simulation-api-{country_lower}-versions") - # Resolve version if version is None: resolved_version = version_dict["latest"] else: resolved_version = version - # Get app name for this version try: app_name = version_dict[resolved_version] except KeyError: @@ -113,47 +138,93 @@ async def submit_simulation(request: SimulationRequest): Routes to the appropriate app based on country and version params. Returns immediately with job_id for polling. """ - try: - app_name, resolved_version = get_app_name(request.country, request.version) - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) + observation = GatewayObservation.from_request(observability, request) + + with observation.request_span("gateway.submit_simulation"): + observation.emit(stage="gateway.received", status="accepted") + try: + app_name, resolved_version = observation.call_stage( + "gateway.version_resolved", + lambda: get_app_name(request.country, request.version), + details={"requested_version": request.version}, + on_success=lambda result, details: details.update( + { + "version": result[1], + "modal_app_name": result[0], + } + ), + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + payload = request.model_dump( + exclude={"version", "telemetry"}, + mode="json", + ) + if request.telemetry is not None: + payload["_telemetry"] = dict(observation.telemetry or {}) + logger.info( + "Routing %s:%s to app %s (run_id=%s)", + request.country, + resolved_version, + app_name, + observation.run_id, + ) - payload = request.model_dump( - exclude={"version", "telemetry"}, - mode="json", - ) - run_id = request.telemetry.run_id if request.telemetry else None - if request.telemetry is not None: - payload["_telemetry"] = request.telemetry.model_dump(mode="json") - - logger.info( - "Routing %s:%s to app %s (run_id=%s)", - request.country, - resolved_version, - app_name, - run_id, - ) + def spawn_job(): + sim_func = modal.Function.from_name(app_name, "run_simulation") + call = sim_func.spawn(payload) + registry_payload = dict(observation.telemetry or {}) + registry_payload.update( + { + "job_id": call.object_id, + "country": request.country, + "country_package_version": resolved_version, + "modal_app_name": app_name, + } + ) + get_job_telemetry_registry()[call.object_id] = registry_payload + bundle = _build_policyengine_bundle( + request.country, + resolved_version, + payload, + ) + _job_metadata_store()[call.object_id] = _serialize_job_metadata( + app_name, + bundle, + observation.run_id, + ) + return call, bundle, registry_payload + + call, bundle, registry_payload = observation.call_stage( + "gateway.spawned", + spawn_job, + success_status="submitted", + record_failure_counter=True, + details={"version": resolved_version}, + on_success=lambda result, details: details.update( + {"job_id": result[0].object_id} + ), + ) - # Get function reference from the target app - sim_func = modal.Function.from_name(app_name, "run_simulation") - - # Spawn the job (returns immediately) - call = sim_func.spawn(payload) - - bundle = _build_policyengine_bundle(request.country, resolved_version, payload) - job_metadata = _serialize_job_metadata(app_name, bundle, run_id) - _job_metadata_store()[call.object_id] = job_metadata - - return JobSubmitResponse( - job_id=call.object_id, - status="submitted", - poll_url=f"/jobs/{call.object_id}", - country=request.country, - version=resolved_version, - resolved_app_name=app_name, - policyengine_bundle=bundle, - run_id=run_id, - ) + observation.counter( + "policyengine.simulation.run.count", + attributes=observation.metric_attributes( + registry_payload, + status="submitted", + ), + ) + + return JobSubmitResponse( + job_id=call.object_id, + status="submitted", + poll_url=f"/jobs/{call.object_id}", + country=request.country, + version=resolved_version, + resolved_app_name=app_name, + policyengine_bundle=bundle, + run_id=observation.run_id, + ) @router.get( @@ -171,38 +242,48 @@ async def get_job_status(job_id: str): - 500 with status="failed" and error on failure - 404 if job_id not found """ - try: - call = modal.FunctionCall.from_id(job_id) - except Exception: - raise HTTPException(status_code=404, detail=f"Job not found: {job_id}") + job_metadata, telemetry = _build_status_metadata(job_id) - job_metadata = _job_metadata_store().get(job_id) + observation = GatewayObservation.from_request( + observability, + telemetry=telemetry, + ) - try: - result = call.get(timeout=0) - return JobStatusResponse( - status="complete", result=result, **(job_metadata or {}) - ) - except TimeoutError: - return JSONResponse( - status_code=202, - content={ - "status": "running", - "result": None, - "error": None, - **(job_metadata or {}), - }, - ) - except Exception as e: - return JSONResponse( - status_code=500, - content={ - "status": "failed", - "result": None, - "error": str(e), - **(job_metadata or {}), - }, - ) + with observation.request_span("gateway.get_job_status"): + observation.emit(stage="result.polled", status="polling", job_id=job_id) + + try: + call = modal.FunctionCall.from_id(job_id) + except Exception: + raise HTTPException(status_code=404, detail=f"Job not found: {job_id}") + + try: + result = call.get(timeout=0) + return JobStatusResponse( + status="complete", + result=result, + **job_metadata, + ) + except TimeoutError: + return JSONResponse( + status_code=202, + content={ + "status": "running", + "result": None, + "error": None, + **job_metadata, + }, + ) + except Exception as e: + return JSONResponse( + status_code=500, + content={ + "status": "failed", + "result": None, + "error": str(e), + **job_metadata, + }, + ) @router.get("/versions") diff --git a/projects/policyengine-api-simulation/src/modal/gateway/models.py b/projects/policyengine-api-simulation/src/modal/gateway/models.py index 9da9a491e..05cde5a57 100644 --- a/projects/policyengine-api-simulation/src/modal/gateway/models.py +++ b/projects/policyengine-api-simulation/src/modal/gateway/models.py @@ -59,6 +59,7 @@ class JobStatusResponse(BaseModel): """Response model for job status polling.""" status: str + run_id: Optional[str] = None result: Optional[dict] = None error: Optional[str] = None resolved_app_name: Optional[str] = None diff --git a/projects/policyengine-api-simulation/src/modal/gateway/observation.py b/projects/policyengine-api-simulation/src/modal/gateway/observation.py new file mode 100644 index 000000000..a438a90a4 --- /dev/null +++ b/projects/policyengine-api-simulation/src/modal/gateway/observation.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +from contextlib import AbstractContextManager +from typing import Any, Callable, Mapping, TypeVar + +from src.modal.gateway.models import SimulationRequest +from src.modal.observability import ( + build_lifecycle_event, + build_metric_attributes, + build_span_attributes, + observe_stage, +) + +T = TypeVar("T") + + +class _RequestSpan(AbstractContextManager["_RequestSpan"]): + def __init__(self, observation: "GatewayObservation", name: str): + self.observation = observation + self.name = name + self._span_context = None + + def __enter__(self): + self._span_context = self.observation.observability.span( + self.name, + self.observation.span_attributes(country=self.observation.country), + parent_traceparent=self.observation.current_traceparent(), + ) + span = self._span_context.__enter__() + self.observation.update_telemetry(traceparent=span.get_traceparent()) + return self + + def __exit__(self, exc_type, exc_value, traceback): + if self._span_context is None: + return None + return self._span_context.__exit__(exc_type, exc_value, traceback) + + +class GatewayObservation: + def __init__( + self, + observability: Any, + *, + service: str, + telemetry: dict[str, Any] | None = None, + country: str | None = None, + ): + self.observability = observability + self.service = service + self.telemetry = telemetry + self.country = country + + @classmethod + def from_request( + cls, + observability: Any, + request: SimulationRequest | None = None, + telemetry: dict[str, Any] | None = None, + service: str = "policyengine-simulation-gateway", + ) -> "GatewayObservation": + if telemetry is None and request is not None and request.telemetry is not None: + telemetry = request.telemetry.model_dump(mode="json") + return cls( + observability, + service=service, + telemetry=telemetry, + country=None if request is None else request.country, + ) + + @property + def run_id(self) -> str | None: + return None if self.telemetry is None else self.telemetry.get("run_id") + + def update_telemetry(self, **fields: Any) -> None: + values = {key: value for key, value in fields.items() if value is not None} + if not values: + return + if self.telemetry is None: + self.telemetry = {} + self.telemetry.update(values) + + def current_traceparent(self) -> str | None: + return None if self.telemetry is None else self.telemetry.get("traceparent") + + def request_span(self, name: str) -> AbstractContextManager: + return _RequestSpan(self, name) + + def metric_attributes( + self, + telemetry: Mapping[str, Any] | None = None, + **extra: Any, + ) -> dict[str, Any]: + return build_metric_attributes( + self.telemetry if telemetry is None else telemetry, + service=self.service, + **extra, + ) + + def span_attributes( + self, + telemetry: Mapping[str, Any] | None = None, + **extra: Any, + ) -> dict[str, Any]: + return build_span_attributes( + self.telemetry if telemetry is None else telemetry, + service=self.service, + **extra, + ) + + def emit( + self, + *, + stage: str, + status: str, + duration_seconds: float | None = None, + details: Mapping[str, Any] | None = None, + **extra: Any, + ) -> None: + self.observability.emit_lifecycle_event( + build_lifecycle_event( + stage=stage, + status=status, + service=self.service, + telemetry=self.telemetry, + duration_seconds=duration_seconds, + details=details, + country=self.country, + **extra, + ) + ) + + def counter( + self, + name: str, + *, + attributes: Mapping[str, Any] | None = None, + value: int = 1, + ) -> None: + self.observability.emit_counter(name, value=value, attributes=attributes) + + def histogram( + self, + name: str, + value: float, + *, + attributes: Mapping[str, Any] | None = None, + ) -> None: + self.observability.emit_histogram(name, value, attributes=attributes) + + def stage( + self, + stage: str, + *, + success_status: str = "ok", + record_failure_counter: bool = False, + details: Mapping[str, Any] | None = None, + ) -> AbstractContextManager: + return observe_stage( + self.observability, + stage=stage, + service=self.service, + telemetry=self.telemetry, + success_status=success_status, + record_failure_counter=record_failure_counter, + details=details, + parent_traceparent=self.current_traceparent(), + ) + + def call_stage( + self, + stage: str, + fn: Callable[[], T], + *, + success_status: str = "ok", + record_failure_counter: bool = False, + details: Mapping[str, Any] | None = None, + on_success: Callable[[T, dict[str, Any]], None] | None = None, + ) -> T: + with self.stage( + stage, + success_status=success_status, + record_failure_counter=record_failure_counter, + details=details, + ) as observation: + result = fn() + if on_success is not None: + on_success(result, observation.details) + return result diff --git a/projects/policyengine-api-simulation/src/modal/observability.py b/projects/policyengine-api-simulation/src/modal/observability.py new file mode 100644 index 000000000..f542a978b --- /dev/null +++ b/projects/policyengine-api-simulation/src/modal/observability.py @@ -0,0 +1,249 @@ +""" +Simulation-specific observability helpers built on the shared OTLP runtime. +""" + +from __future__ import annotations + +from contextlib import AbstractContextManager +from datetime import UTC, datetime +from time import perf_counter +from typing import Any, Mapping + +from policyengine_fastapi.observability import ( + ObservabilityConfig, + build_observability as build_shared_observability, + build_traceparent as shared_build_traceparent, + parse_bool, + parse_header_value_pairs, +) + +__all__ = [ + "FAILURE_COUNT_METRIC_NAME", + "LOW_CARDINALITY_METRIC_KEYS", + "QUEUE_DURATION_METRIC_NAME", + "SPAN_ATTRIBUTE_KEYS", + "STAGE_DURATION_METRIC_NAME", + "StageObservation", + "build_lifecycle_event", + "build_metric_attributes", + "build_span_attributes", + "build_traceparent", + "duration_since_requested_at", + "get_observability", + "observe_stage", + "parse_bool", + "parse_header_value_pairs", +] + +LOW_CARDINALITY_METRIC_KEYS = ( + "country", + "simulation_kind", + "country_package_version", + "policyengine_version", + "capture_mode", +) +SPAN_ATTRIBUTE_KEYS = ( + "run_id", + "process_id", + "request_id", + "country", + "simulation_kind", + "geography_type", + "geography_code", + "country_package_name", + "country_package_version", + "policyengine_version", + "data_version", + "modal_app_name", + "config_hash", + "capture_mode", +) +STAGE_DURATION_METRIC_NAME = "policyengine.simulation.stage.duration.seconds" +QUEUE_DURATION_METRIC_NAME = "policyengine.simulation.queue.duration.seconds" +FAILURE_COUNT_METRIC_NAME = "policyengine.simulation.failure.count" + + +def build_metric_attributes( + telemetry: Mapping[str, Any] | None = None, + **extra: Any, +) -> dict[str, Any]: + attributes: dict[str, Any] = {} + if telemetry: + for key in LOW_CARDINALITY_METRIC_KEYS: + value = telemetry.get(key) + if value is not None: + attributes[key] = value + attributes.update({k: v for k, v in extra.items() if v is not None}) + return attributes + + +def build_span_attributes( + telemetry: Mapping[str, Any] | None = None, + **extra: Any, +) -> dict[str, Any]: + attributes: dict[str, Any] = {} + if telemetry: + for key in SPAN_ATTRIBUTE_KEYS: + value = telemetry.get(key) + if value is not None: + attributes[key] = value + attributes.update({k: v for k, v in extra.items() if v is not None}) + return attributes + + +def build_traceparent(span_context: Any) -> str | None: + return shared_build_traceparent(span_context) + + +def build_lifecycle_event( + *, + stage: str, + status: str, + service: str, + telemetry: Mapping[str, Any] | None = None, + duration_seconds: float | None = None, + details: Mapping[str, Any] | None = None, + **extra: Any, +) -> dict[str, Any]: + payload: dict[str, Any] = { + "event_name": "simulation.lifecycle", + "stage": stage, + "status": status, + "timestamp": datetime.now(UTC).isoformat(), + "service": service, + "details": dict(details or {}), + } + if duration_seconds is not None: + payload["duration_seconds"] = duration_seconds + if telemetry: + payload.update({k: v for k, v in telemetry.items() if v is not None}) + payload.update({k: v for k, v in extra.items() if v is not None}) + return payload + + +def duration_since_requested_at( + telemetry: Mapping[str, Any] | None, +) -> float | None: + requested_at = None if telemetry is None else telemetry.get("requested_at") + if not requested_at: + return None + try: + requested = datetime.fromisoformat(str(requested_at)) + except ValueError: + return None + return (datetime.now(UTC) - requested.astimezone(UTC)).total_seconds() + + +class StageObservation(AbstractContextManager["StageObservation"]): + def __init__( + self, + observability: Any, + *, + stage: str, + service: str, + telemetry: Mapping[str, Any] | None = None, + success_status: str = "ok", + record_failure_counter: bool = False, + details: Mapping[str, Any] | None = None, + parent_traceparent: str | None = None, + timer=perf_counter, + ): + self.observability = observability + self.stage = stage + self.service = service + self.telemetry = telemetry + self.success_status = success_status + self.record_failure_counter = record_failure_counter + self.details: dict[str, Any] = dict(details or {}) + self.parent_traceparent = parent_traceparent + self.timer = timer + self._start = 0.0 + self._span_context = None + self.span = None + + def __enter__(self): + self._start = self.timer() + self._span_context = self.observability.span( + self.stage, + build_span_attributes( + self.telemetry, + service=self.service, + stage=self.stage, + ), + parent_traceparent=self.parent_traceparent, + ) + self.span = self._span_context.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + duration = max(0.0, self.timer() - self._start) + status = self.success_status if exc_type is None else "failed" + if exc_value is not None and "error" not in self.details: + self.details["error"] = str(exc_value) + + self.observability.emit_histogram( + STAGE_DURATION_METRIC_NAME, + duration, + attributes=build_metric_attributes( + self.telemetry, + service=self.service, + stage=self.stage, + status=status, + ), + ) + if exc_type is not None and self.record_failure_counter: + self.observability.emit_counter( + FAILURE_COUNT_METRIC_NAME, + attributes=build_metric_attributes( + self.telemetry, + service=self.service, + stage=self.stage, + status="failed", + ), + ) + self.observability.emit_lifecycle_event( + build_lifecycle_event( + stage=self.stage, + status=status, + service=self.service, + telemetry=self.telemetry, + duration_seconds=duration, + details=self.details, + ) + ) + if self._span_context is None: + return None + return self._span_context.__exit__(exc_type, exc_value, traceback) + + +def observe_stage( + observability: Any, + *, + stage: str, + service: str, + telemetry: Mapping[str, Any] | None = None, + success_status: str = "ok", + record_failure_counter: bool = False, + details: Mapping[str, Any] | None = None, + parent_traceparent: str | None = None, + timer=perf_counter, +) -> StageObservation: + return StageObservation( + observability, + stage=stage, + service=service, + telemetry=telemetry, + success_status=success_status, + record_failure_counter=record_failure_counter, + details=details, + parent_traceparent=parent_traceparent, + timer=timer, + ) + + +def get_observability( + service_name: str, + environment: str = "production", +): + config = ObservabilityConfig.from_env(service_name, environment) + return build_shared_observability(config) diff --git a/projects/policyengine-api-simulation/src/modal/simulation.py b/projects/policyengine-api-simulation/src/modal/simulation.py index a245e252c..2f143d3ef 100644 --- a/projects/policyengine-api-simulation/src/modal/simulation.py +++ b/projects/policyengine-api-simulation/src/modal/simulation.py @@ -9,10 +9,12 @@ import logging import os import tempfile +from typing import Any # Module-level imports - these are SNAPSHOTTED at image build time from policyengine.simulation import Simulation, SimulationOptions +from src.modal.observability import observe_stage from src.modal.telemetry import split_internal_payload logger = logging.getLogger(__name__) @@ -65,7 +67,11 @@ def setup_gcp_credentials(): logger.warning("No GCP credentials found in environment variables") -def run_simulation_impl(params: dict) -> dict: +def run_simulation_impl( + params: dict, + observability: Any = None, + telemetry_context: dict[str, Any] | None = None, +) -> dict: """ Execute economic simulation. @@ -73,9 +79,24 @@ def run_simulation_impl(params: dict) -> dict: Accepts SimulationOptions as a dict and returns EconomyComparison as a dict. """ # Set up GCP credentials if needed - setup_gcp_credentials() - simulation_params, telemetry, metadata = split_internal_payload(params) + event_telemetry = ( + telemetry.model_dump(mode="json") + if telemetry is not None + else telemetry_context + ) + + if observability is not None: + with observe_stage( + observability, + stage="worker.credentials.ready", + service="policyengine-simulation-worker", + telemetry=event_telemetry, + record_failure_counter=True, + ): + setup_gcp_credentials() + else: + setup_gcp_credentials() logger.info( "Starting simulation for country=%s run_id=%s process_id=%s", @@ -87,16 +108,57 @@ def run_simulation_impl(params: dict) -> dict: logger.info("Received simulation metadata keys: %s", sorted(metadata)) # Validate and create simulation options - options = SimulationOptions.model_validate(simulation_params) + if observability is not None: + with observe_stage( + observability, + stage="worker.options.validated", + service="policyengine-simulation-worker", + telemetry=event_telemetry, + record_failure_counter=True, + ): + options = SimulationOptions.model_validate(simulation_params) + else: + options = SimulationOptions.model_validate(simulation_params) logger.info("Initialising simulation from input") # Create simulation instance - simulation = Simulation(**options.model_dump()) + if observability is not None: + with observe_stage( + observability, + stage="worker.simulation.constructed", + service="policyengine-simulation-worker", + telemetry=event_telemetry, + record_failure_counter=True, + ): + simulation = Simulation(**options.model_dump()) + else: + simulation = Simulation(**options.model_dump()) logger.info("Calculating comparison") # Run the economy comparison calculation - result = simulation.calculate_economy_comparison() + if observability is not None: + with observe_stage( + observability, + stage="worker.comparison.calculated", + service="policyengine-simulation-worker", + telemetry=event_telemetry, + record_failure_counter=True, + ): + result = simulation.calculate_economy_comparison() + else: + result = simulation.calculate_economy_comparison() logger.info("Comparison complete") # Use mode='json' to ensure numpy arrays are converted to lists - return result.model_dump(mode="json") + if observability is not None: + with observe_stage( + observability, + stage="worker.result.serialized", + service="policyengine-simulation-worker", + telemetry=event_telemetry, + record_failure_counter=True, + ): + serialized = result.model_dump(mode="json") + else: + serialized = result.model_dump(mode="json") + return serialized diff --git a/projects/policyengine-api-simulation/tests/gateway/test_endpoints.py b/projects/policyengine-api-simulation/tests/gateway/test_endpoints.py index 65fec6ee2..2674a7266 100644 --- a/projects/policyengine-api-simulation/tests/gateway/test_endpoints.py +++ b/projects/policyengine-api-simulation/tests/gateway/test_endpoints.py @@ -9,6 +9,43 @@ from fastapi.testclient import TestClient +class RecordingSpan: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + return None + + def get_traceparent(self): + return None + + +class RecordingObservability: + def __init__(self): + self.events = [] + self.counters = [] + self.histograms = [] + + def emit_lifecycle_event(self, payload): + if hasattr(payload, "model_dump"): + self.events.append(payload.model_dump(mode="json")) + else: + self.events.append(payload) + + def emit_counter(self, name, value=1, attributes=None): + self.counters.append( + {"name": name, "value": value, "attributes": dict(attributes or {})} + ) + + def emit_histogram(self, name, value, attributes=None): + self.histograms.append( + {"name": name, "value": value, "attributes": dict(attributes or {})} + ) + + def span(self, name, attributes=None, parent_traceparent=None): + return RecordingSpan() + + class TestGetAppName: """Tests for the get_app_name helper function.""" @@ -229,6 +266,12 @@ def test__given_submission_with_telemetry__then_preserves_run_id( data = response.json() assert data["run_id"] == "run-123" assert mock_modal["func"].last_payload["_telemetry"]["run_id"] == "run-123" + assert ( + mock_modal["dicts"]["simulation-api-job-telemetry"]["mock-job-id-123"][ + "run_id" + ] + == "run-123" + ) def test__given_submission_with_data__then_returns_resolved_bundle_metadata( self, mock_modal, client: TestClient @@ -396,3 +439,48 @@ def test__given_submitted_job_with_telemetry__then_polling_echoes_run_id( assert response.status_code == 200 assert response.json()["run_id"] == "run-123" + + def test__given_repeated_terminal_polls__then_gateway_does_not_reemit_terminal_telemetry( + self, mock_modal, client: TestClient, monkeypatch + ): + from src.modal.gateway import endpoints + + recorder = RecordingObservability() + monkeypatch.setattr(endpoints, "observability", recorder) + + mock_modal["dicts"]["simulation-api-us-versions"] = { + "latest": "1.500.0", + "1.500.0": "policyengine-simulation-us1-500-0-uk2-66-0", + } + + submit_response = client.post( + "/simulate/economy/comparison", + json={ + "country": "us", + "scope": "macro", + "reform": {}, + "_telemetry": { + "run_id": "run-123", + "process_id": "proc-123", + "capture_mode": "disabled", + }, + }, + ) + + job_id = submit_response.json()["job_id"] + event_count_before_polls = len(recorder.events) + histogram_count_before_polls = len(recorder.histograms) + counter_count_before_polls = len(recorder.counters) + + first_poll = client.get(f"/jobs/{job_id}") + second_poll = client.get(f"/jobs/{job_id}") + + assert first_poll.status_code == 200 + assert second_poll.status_code == 200 + poll_events = recorder.events[event_count_before_polls:] + assert [event["stage"] for event in poll_events] == [ + "result.polled", + "result.polled", + ] + assert len(recorder.histograms) == histogram_count_before_polls + assert len(recorder.counters) == counter_count_before_polls diff --git a/projects/policyengine-api-simulation/tests/gateway/test_observation.py b/projects/policyengine-api-simulation/tests/gateway/test_observation.py new file mode 100644 index 000000000..95eb768b4 --- /dev/null +++ b/projects/policyengine-api-simulation/tests/gateway/test_observation.py @@ -0,0 +1,119 @@ +from src.modal.gateway.models import SimulationRequest +from src.modal.gateway.observation import GatewayObservation + + +class RecordingSpan: + def __init__(self, traceparent: str | None = None): + self.traceparent = traceparent + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + return None + + def get_traceparent(self) -> str | None: + return self.traceparent + + +class RecordingObservability: + def __init__(self): + self.events = [] + self.counters = [] + self.histograms = [] + self.spans = [] + + def emit_lifecycle_event(self, payload): + if hasattr(payload, "model_dump"): + self.events.append(payload.model_dump(mode="json")) + else: + self.events.append(payload) + + def emit_counter(self, name, value=1, attributes=None): + self.counters.append( + {"name": name, "value": value, "attributes": dict(attributes or {})} + ) + + def emit_histogram(self, name, value, attributes=None): + self.histograms.append( + {"name": name, "value": value, "attributes": dict(attributes or {})} + ) + + def span(self, name, attributes=None, parent_traceparent=None): + self.spans.append( + { + "name": name, + "attributes": dict(attributes or {}), + "parent_traceparent": parent_traceparent, + } + ) + return RecordingSpan("00-11111111111111111111111111111111-2222222222222222-01") + + +def test_gateway_observation__request_span_updates_traceparent(): + request = SimulationRequest.model_validate( + { + "country": "us", + "scope": "macro", + "reform": {}, + "_telemetry": { + "run_id": "run-123", + "process_id": "proc-123", + }, + } + ) + observability = RecordingObservability() + + observation = GatewayObservation.from_request(observability, request) + + with observation.request_span("gateway.submit_simulation"): + pass + + assert observation.telemetry is not None + assert ( + observation.telemetry["traceparent"] + == "00-11111111111111111111111111111111-2222222222222222-01" + ) + assert observability.spans[0]["attributes"]["country"] == "us" + assert observability.spans[0]["attributes"]["run_id"] == "run-123" + + +def test_gateway_observation__call_stage_records_success_and_detail_updates(): + observability = RecordingObservability() + observation = GatewayObservation( + observability, + service="policyengine-simulation-gateway", + telemetry={ + "run_id": "run-123", + "country": "us", + "simulation_kind": "national", + "traceparent": "00-parenttraceparent-1111111111111111-01", + }, + country="us", + ) + + result = observation.call_stage( + "gateway.version_resolved", + lambda: ("policyengine-simulation-us1-2-3-uk2-3-4", "1.2.3"), + details={"requested_version": None}, + on_success=lambda outcome, details: details.update( + { + "modal_app_name": outcome[0], + "version": outcome[1], + } + ), + ) + + assert result == ("policyengine-simulation-us1-2-3-uk2-3-4", "1.2.3") + assert observability.spans[0]["name"] == "gateway.version_resolved" + assert ( + observability.spans[0]["parent_traceparent"] + == "00-parenttraceparent-1111111111111111-01" + ) + assert observability.events[0]["stage"] == "gateway.version_resolved" + assert observability.events[0]["status"] == "ok" + assert observability.events[0]["details"] == { + "requested_version": None, + "modal_app_name": "policyengine-simulation-us1-2-3-uk2-3-4", + "version": "1.2.3", + } diff --git a/projects/policyengine-api-simulation/tests/test_modal_observability.py b/projects/policyengine-api-simulation/tests/test_modal_observability.py new file mode 100644 index 000000000..717659f36 --- /dev/null +++ b/projects/policyengine-api-simulation/tests/test_modal_observability.py @@ -0,0 +1,221 @@ +from datetime import UTC, datetime, timedelta + +import pytest + +from src.modal.observability import ( + build_metric_attributes, + build_span_attributes, + build_traceparent, + duration_since_requested_at, + FAILURE_COUNT_METRIC_NAME, + observe_stage, + parse_bool, + parse_header_value_pairs, + STAGE_DURATION_METRIC_NAME, +) + + +def test_parse_header_value_pairs__supports_commas_and_newlines(): + assert parse_header_value_pairs("Authorization=Bearer abc,\nX-Scope=ops") == { + "Authorization": "Bearer abc", + "X-Scope": "ops", + } + + +def test_parse_header_value_pairs__rejects_invalid_entries(): + with pytest.raises(ValueError, match="key=value"): + parse_header_value_pairs("Authorization") + + +def test_parse_bool__supports_common_truthy_and_falsey_values(): + assert parse_bool("true") is True + assert parse_bool("YES") is True + assert parse_bool("0") is False + assert parse_bool(None, default=True) is True + + +def test_build_metric_attributes__keeps_low_cardinality_fields(): + attributes = build_metric_attributes( + { + "country": "us", + "simulation_kind": "state", + "geography_type": "state", + "geography_code": "tx", + "capture_mode": "disabled", + "country_package_version": "1.632.5", + "policyengine_version": "0.13.0", + "modal_app_name": "policyengine-simulation-us1-632-5-uk2-78-0", + "run_id": "run-123", + }, + service="policyengine-simulation-worker", + status="complete", + ) + + assert attributes == { + "country": "us", + "simulation_kind": "state", + "capture_mode": "disabled", + "country_package_version": "1.632.5", + "policyengine_version": "0.13.0", + "service": "policyengine-simulation-worker", + "status": "complete", + } + + +def test_build_span_attributes__preserves_high_cardinality_trace_context(): + attributes = build_span_attributes( + { + "run_id": "run-123", + "process_id": "proc-123", + "country": "us", + "simulation_kind": "state", + "geography_type": "state", + "geography_code": "tx", + "config_hash": "sha256:test", + }, + service="policyengine-simulation-worker", + ) + + assert attributes == { + "run_id": "run-123", + "process_id": "proc-123", + "country": "us", + "simulation_kind": "state", + "geography_type": "state", + "geography_code": "tx", + "config_hash": "sha256:test", + "service": "policyengine-simulation-worker", + } + + +def test_duration_since_requested_at__returns_elapsed_seconds(): + requested_at = (datetime.now(UTC) - timedelta(seconds=5)).isoformat() + + duration = duration_since_requested_at({"requested_at": requested_at}) + + assert duration is not None + assert 0 < duration < 30 + + +def test_build_traceparent__formats_w3c_traceparent(): + class FakeSpanContext: + is_valid = True + trace_id = 0x1234 + span_id = 0x5678 + trace_flags = 1 + + assert ( + build_traceparent(FakeSpanContext()) + == "00-00000000000000000000000000001234-0000000000005678-01" + ) + + +class RecordingSpan: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + return None + + def set_attribute(self, key, value): + return None + + def add_event(self, name, attributes=None): + return None + + +class RecordingObservability: + def __init__(self): + self.events = [] + self.histograms = [] + self.counters = [] + self.spans = [] + + def emit_lifecycle_event(self, payload): + if hasattr(payload, "model_dump"): + self.events.append(payload.model_dump(mode="json")) + else: + self.events.append(payload) + + def emit_counter(self, name, value=1, attributes=None): + self.counters.append( + {"name": name, "value": value, "attributes": dict(attributes or {})} + ) + + def emit_histogram(self, name, value, attributes=None): + self.histograms.append( + {"name": name, "value": value, "attributes": dict(attributes or {})} + ) + + def span(self, name, attributes=None, parent_traceparent=None): + self.spans.append( + { + "name": name, + "attributes": dict(attributes or {}), + "parent_traceparent": parent_traceparent, + } + ) + return RecordingSpan() + + +def test_observe_stage__records_duration_metrics_and_lifecycle_events(): + observability = RecordingObservability() + ticks = iter((10.0, 12.5)) + + with observe_stage( + observability, + stage="worker.options.validated", + service="policyengine-simulation-worker", + telemetry={"country": "us", "simulation_kind": "state"}, + timer=lambda: next(ticks), + ): + pass + + assert observability.spans[0]["name"] == "worker.options.validated" + assert observability.histograms == [ + { + "name": STAGE_DURATION_METRIC_NAME, + "value": 2.5, + "attributes": { + "country": "us", + "simulation_kind": "state", + "service": "policyengine-simulation-worker", + "stage": "worker.options.validated", + "status": "ok", + }, + } + ] + assert observability.events[0]["stage"] == "worker.options.validated" + assert observability.events[0]["duration_seconds"] == 2.5 + + +def test_observe_stage__records_failure_metrics_on_exception(): + observability = RecordingObservability() + ticks = iter((20.0, 21.25)) + + with pytest.raises(RuntimeError, match="boom"): + with observe_stage( + observability, + stage="worker.comparison.calculated", + service="policyengine-simulation-worker", + telemetry={"country": "us", "simulation_kind": "state"}, + record_failure_counter=True, + timer=lambda: next(ticks), + ): + raise RuntimeError("boom") + + assert observability.histograms[0]["attributes"]["status"] == "failed" + assert observability.counters == [ + { + "name": FAILURE_COUNT_METRIC_NAME, + "value": 1, + "attributes": { + "country": "us", + "simulation_kind": "state", + "service": "policyengine-simulation-worker", + "stage": "worker.comparison.calculated", + "status": "failed", + }, + } + ] + assert observability.events[0]["details"]["error"] == "boom" diff --git a/projects/policyengine-api-simulation/uv.lock b/projects/policyengine-api-simulation/uv.lock index 34e79ea26..bd8c6f17b 100644 --- a/projects/policyengine-api-simulation/uv.lock +++ b/projects/policyengine-api-simulation/uv.lock @@ -725,7 +725,9 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7a/75/7e9cd1126a1e1f0cd67b0eda02e5221b28488d352684704a78ed505bd719/greenlet-3.4.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:43748988b097f9c6f09364f260741aa73c80747f63389824435c7a50bfdfd5c1", size = 285856, upload-time = "2026-04-08T15:52:45.82Z" }, { url = "https://files.pythonhosted.org/packages/9d/c4/3e2df392e5cb199527c4d9dbcaa75c14edcc394b45040f0189f649631e3c/greenlet-3.4.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:5566e4e2cd7a880e8c27618e3eab20f3494452d12fd5129edef7b2f7aa9a36d1", size = 610208, upload-time = "2026-04-08T16:24:39.674Z" }, { url = "https://files.pythonhosted.org/packages/da/af/750cdfda1d1bd30a6c28080245be8d0346e669a98fdbae7f4102aa95fff3/greenlet-3.4.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:1054c5a3c78e2ab599d452f23f7adafef55062a783a8e241d24f3b633ba6ff82", size = 621269, upload-time = "2026-04-08T16:30:59.767Z" }, + { url = "https://files.pythonhosted.org/packages/e0/93/c8c508d68ba93232784bbc1b5474d92371f2897dfc6bc281b419f2e0d492/greenlet-3.4.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:98eedd1803353daf1cd9ef23eef23eda5a4d22f99b1f998d273a8b78b70dd47f", size = 628455, upload-time = "2026-04-08T16:40:40.698Z" }, { url = "https://files.pythonhosted.org/packages/54/78/0cbc693622cd54ebe25207efbb3a0eb07c2639cb8594f6e3aaaa0bb077a8/greenlet-3.4.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f82cb6cddc27dd81c96b1506f4aa7def15070c3b2a67d4e46fd19016aacce6cf", size = 617549, upload-time = "2026-04-08T15:56:34.893Z" }, + { url = "https://files.pythonhosted.org/packages/7f/46/cfaaa0ade435a60550fd83d07dfd5c41f873a01da17ede5c4cade0b9bab8/greenlet-3.4.0-cp313-cp313-manylinux_2_39_riscv64.whl", hash = "sha256:b7857e2202aae67bc5725e0c1f6403c20a8ff46094ece015e7d474f5f7020b55", size = 426238, upload-time = "2026-04-08T16:43:06.865Z" }, { url = "https://files.pythonhosted.org/packages/ba/c0/8966767de01343c1ff47e8b855dc78e7d1a8ed2b7b9c83576a57e289f81d/greenlet-3.4.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:227a46251ecba4ff46ae742bc5ce95c91d5aceb4b02f885487aff269c127a729", size = 1575310, upload-time = "2026-04-08T16:26:21.671Z" }, { url = "https://files.pythonhosted.org/packages/b8/38/bcdc71ba05e9a5fda87f63ffc2abcd1f15693b659346df994a48c968003d/greenlet-3.4.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5b99e87be7eba788dd5b75ba1cde5639edffdec5f91fe0d734a249535ec3408c", size = 1640435, upload-time = "2026-04-08T15:57:32.572Z" }, { url = "https://files.pythonhosted.org/packages/a1/c2/19b664b7173b9e4ef5f77e8cef9f14c20ec7fce7920dc1ccd7afd955d093/greenlet-3.4.0-cp313-cp313-win_amd64.whl", hash = "sha256:849f8bc17acd6295fcb5de8e46d55cc0e52381c56eaf50a2afd258e97bc65940", size = 238760, upload-time = "2026-04-08T17:04:03.878Z" }, @@ -1590,7 +1592,7 @@ name = "pexpect" version = "4.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "ptyprocess" }, + { name = "ptyprocess", marker = "sys_platform != 'emscripten' and sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/42/92/cc564bf6381ff43ce1f4d06852fc19a2f11d180f23dc32d9588bee2f149d/pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f", size = 166450, upload-time = "2023-11-25T09:07:26.339Z" } wheels = [ @@ -1679,6 +1681,7 @@ dependencies = [ { name = "fastapi", extra = ["standard"] }, { name = "opentelemetry-exporter-gcp-monitoring" }, { name = "opentelemetry-exporter-gcp-trace" }, + { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-instrumentation-fastapi" }, { name = "opentelemetry-instrumentation-logging" }, { name = "opentelemetry-instrumentation-sqlalchemy" }, @@ -1695,6 +1698,7 @@ requires-dist = [ { name = "fastapi", extras = ["standard"], specifier = ">=0.115.8,<0.116.0" }, { name = "opentelemetry-exporter-gcp-monitoring", specifier = ">=1.9.0a0,<2.0.0" }, { name = "opentelemetry-exporter-gcp-trace", specifier = ">=1.9.0,<2.0.0" }, + { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.30.0,<2.0.0" }, { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.51b0,<0.52" }, { name = "opentelemetry-instrumentation-logging", specifier = ">=0.51b0,<0.52" }, { name = "opentelemetry-instrumentation-sqlalchemy", specifier = ">=0.51b0" },