diff --git a/projects/policyengine-api-simulation/README.md b/projects/policyengine-api-simulation/README.md index 74149a97b..9aa3c3031 100644 --- a/projects/policyengine-api-simulation/README.md +++ b/projects/policyengine-api-simulation/README.md @@ -30,4 +30,44 @@ Operational notes: `run_function` call in `src/modal/app.py` for one deploy. - Only pure default requests read the baked folder: any request naming an explicit `data` dataset or pinning a `data_version` bypasses it (see - `_load_dataset` in `simulation_runtime.py`). \ No newline at end of file + `_load_dataset` in `simulation_runtime.py`). + +## Observability + +The service currently runs two observability backends in parallel: + +- `policyengine-observability` emits structured request, operation, error, + and runtime timing logs. +- Logfire remains enabled as the legacy platform for existing dashboards and + alerting while we evaluate replacing it with another observability platform. + +New instrumentation should be added through `policyengine-observability`; the +Logfire path is retained for continuity during that evaluation. + +For `policyengine-observability`, this service intentionally forces: + +- `log_destinations=("stdout",)` +- `otel_enabled=False` +- `google_cloud_project=None` + +Cloud Logging and OTel export are therefore disabled until the target GCP +project is ready. The package does not currently provide memory-usage +measurements, so memory is not emitted. + +Modal captures container output and exposes it through the app logs UI and +CLI. Useful `policyengine-observability` checks after deploying: + +```bash +modal app logs policyengine-simulation-gateway --tail 100 +modal app logs policyengine-simulation-gateway --tail 100 --search policyengine.observability +modal app logs policyengine-simulation-py --tail 100 --search run_simulation +modal app dashboard policyengine-simulation-gateway +``` + +If using Modal source filters, include both `stdout` and `stderr`. The +observability destination is named `stdout`, but its current Python logging +handler writes through the standard stream handler. + +Logfire continues to use the `policyengine-logfire` Modal secret. Worker +functions and the gateway configure Logfire only when `LOGFIRE_TOKEN` is +present. diff --git a/projects/policyengine-api-simulation/fixtures/gateway/shared.py b/projects/policyengine-api-simulation/fixtures/gateway/shared.py index 0707f6cf5..73e732092 100644 --- a/projects/policyengine-api-simulation/fixtures/gateway/shared.py +++ b/projects/policyengine-api-simulation/fixtures/gateway/shared.py @@ -4,6 +4,9 @@ from fastapi import FastAPI from fastapi.testclient import TestClient +from policyengine_api_simulation.observability import ( + init_simulation_observability, +) from src.modal.gateway.auth import require_auth from src.modal.gateway.endpoints import router @@ -21,6 +24,7 @@ def create_gateway_app(*, authenticate: bool = True) -> FastAPI: description="Test instance for unit tests", version="0.0.1", ) + init_simulation_observability(app, service_role="modal_gateway") app.include_router(router) if authenticate: app.dependency_overrides[require_auth] = lambda: None diff --git a/projects/policyengine-api-simulation/fixtures/test_logfire_integration.py b/projects/policyengine-api-simulation/fixtures/test_logfire_integration.py deleted file mode 100644 index c94bb6c1a..000000000 --- a/projects/policyengine-api-simulation/fixtures/test_logfire_integration.py +++ /dev/null @@ -1,22 +0,0 @@ -"""Fixtures for logfire integration tests.""" - -from unittest.mock import MagicMock - -import pytest - - -@pytest.fixture -def mock_span(): - """Create a mock span with context manager support.""" - span = MagicMock() - span.__enter__ = MagicMock(return_value=span) - span.__exit__ = MagicMock(return_value=None) - return span - - -@pytest.fixture -def mock_logfire(mock_span): - """Create a mock logfire module.""" - logfire = MagicMock() - logfire.span.return_value = mock_span - return logfire diff --git a/projects/policyengine-api-simulation/pyproject.toml b/projects/policyengine-api-simulation/pyproject.toml index fa2e2b0c0..21f382f1c 100644 --- a/projects/policyengine-api-simulation/pyproject.toml +++ b/projects/policyengine-api-simulation/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "policyengine-us==1.752.2", "tables>=3.10.2", "modal>=0.73.0", + "policyengine-observability[fastapi]>=1.3.0,<2", "logfire>=3.0.0", ] diff --git a/projects/policyengine-api-simulation/src/modal/app.py b/projects/policyengine-api-simulation/src/modal/app.py index 0ccaa87dd..b57666745 100644 --- a/projects/policyengine-api-simulation/src/modal/app.py +++ b/projects/policyengine-api-simulation/src/modal/app.py @@ -10,9 +10,22 @@ import modal import os +from policyengine_observability import operation, set_attribute + from src.modal._image_setup import prebuild_country_datasets, snapshot_models from src.modal.dependency_pins import project_dependency_pin +from src.modal.logfire_legacy import ( + configure_logfire, + flush_logfire, + legacy_logfire_attributes, + logfire_span, +) from src.modal.logging_redaction import redact_params_for_logging +from policyengine_api_simulation.observability import ( + configure_process_observability, + init_process_observability, + process_static_attributes, +) from policyengine_api_simulation.release_bundle import get_bundled_country_model_version @@ -89,7 +102,7 @@ def get_app_name(policyengine_version: str) -> str: gcp_secret = modal.Secret.from_name("gcp-credentials", environment_name="main") data_secret = modal.Secret.from_name("policyengine-data-credentials") hf_secret = modal.Secret.from_name("huggingface-token") -# Logfire secret is environment-specific +# Legacy Logfire export remains while we evaluate a replacement observability platform. logfire_secret = modal.Secret.from_name("policyengine-logfire") @@ -130,7 +143,8 @@ def build_base_simulation_image() -> modal.Image: "uv", "fastapi>=0.115.0", "tables>=3.10.2", - "logfire", + "logfire>=3.0.0", + "policyengine-observability[fastapi]>=1.3.0,<2", ) .run_commands( bundle_install_command(POLICYENGINE_VERSION), @@ -169,20 +183,28 @@ def build_base_simulation_image() -> modal.Image: ) -def configure_logfire(service_name: str = "policyengine-simulation"): - """Configure Logfire for observability. Call at start of each function.""" - import logfire +def _configure_modal_observability( + *, + service_role: str, + modal_function_name: str, +) -> dict: + configure_process_observability( + platform="modal", + service_role=service_role, + modal_app_name=APP_NAME, + modal_function_name=modal_function_name, + ) + init_process_observability(service_role=service_role) + # Worker operations have no FastAPI adapter to inject static identity + # attributes, so the caller must merge these into its operation attrs. + return process_static_attributes(service_role=service_role) - token = os.environ.get("LOGFIRE_TOKEN", "") - if not token: - return - logfire.configure( - service_name=service_name, - token=token, - environment=os.environ.get("LOGFIRE_ENVIRONMENT", "production"), - console=False, - ) +def _set_modal_call_attributes() -> None: + try: + set_attribute("function_call_id", modal.current_function_call_id()) + except Exception: + pass @app.function( @@ -199,28 +221,37 @@ def run_simulation(params: dict) -> dict: Execute economic simulation. Imports the snapshotted implementation at runtime. - Logs input params and output result to Logfire for observability. + Emits redacted operation data to both observability systems. """ - import logfire - - from policyengine_api_simulation.simulation_runtime import run_simulation_impl - - configure_logfire() + static_attributes = _configure_modal_observability( + service_role="simulation_worker", + modal_function_name="run_simulation", + ) # We deliberately avoid sending full ``params`` or ``result`` blobs to - # Logfire: both can embed signed URLs, reform parameter trees with - # sensitive policy details, or result payloads large enough to blow the - # span attribute size budget. The redacted summary keeps correlation - # traceability via run_id while leaving the heavy payload in memory. - redacted_params = redact_params_for_logging(params) + # either observability system: both can embed signed URLs, reform + # parameter trees with sensitive policy details, or result payloads + # large enough to blow attribute budgets. The redacted summary keeps + # correlation traceability via run_id while leaving the heavy payload + # in memory. + redacted_params = { + **redact_params_for_logging(params), + **static_attributes, + **legacy_logfire_attributes(), + } + logfire_enabled = False try: - with logfire.span( - "run_simulation", - **redacted_params, - ): - return run_simulation_impl(params) + with operation("run_simulation", flavor="modal_function", **redacted_params): + logfire_enabled = configure_logfire("policyengine-simulation") + _set_modal_call_attributes() + with logfire_span(logfire_enabled, "run_simulation", **redacted_params): + from policyengine_api_simulation.simulation_runtime import ( + run_simulation_impl, + ) + + return run_simulation_impl(params) finally: - logfire.force_flush() + flush_logfire(logfire_enabled) @app.function( @@ -234,18 +265,32 @@ def run_simulation(params: dict) -> dict: ) def run_budget_window_batch(params: dict) -> dict: """Execute a multi-year budget-window batch orchestration.""" - import logfire - - from src.modal.budget_window_batch import run_budget_window_batch_impl - - configure_logfire() + static_attributes = _configure_modal_observability( + service_role="budget_window_worker", + modal_function_name="run_budget_window_batch", + ) - redacted_params = redact_params_for_logging(params) + redacted_params = { + **redact_params_for_logging(params), + **static_attributes, + **legacy_logfire_attributes(), + } + logfire_enabled = False try: - with logfire.span( + with operation( "run_budget_window_batch", + flavor="modal_function", **redacted_params, ): - return run_budget_window_batch_impl(params) + logfire_enabled = configure_logfire("policyengine-simulation") + _set_modal_call_attributes() + with logfire_span( + logfire_enabled, + "run_budget_window_batch", + **redacted_params, + ): + from src.modal.budget_window_batch import run_budget_window_batch_impl + + return run_budget_window_batch_impl(params) finally: - logfire.force_flush() + flush_logfire(logfire_enabled) diff --git a/projects/policyengine-api-simulation/src/modal/budget_window_batch.py b/projects/policyengine-api-simulation/src/modal/budget_window_batch.py index 538a76685..12cd139a0 100644 --- a/projects/policyengine-api-simulation/src/modal/budget_window_batch.py +++ b/projects/policyengine-api-simulation/src/modal/budget_window_batch.py @@ -5,15 +5,20 @@ from typing import Any import modal +from policyengine_observability import segment, set_attribute from src.modal.budget_window_context import build_batch_context from src.modal.budget_window_scheduler import BudgetWindowBatchRunner +from policyengine_api_simulation.observability import SegmentName def run_budget_window_batch_impl(params: dict[str, Any]) -> dict[str, Any]: - context = build_batch_context( - params, - batch_job_id=modal.current_function_call_id(), - ) + batch_job_id = modal.current_function_call_id() + set_attribute("batch_job_id", batch_job_id) + with segment(SegmentName.BUDGET_WINDOW_CONTEXT): + context = build_batch_context( + params, + batch_job_id=batch_job_id, + ) runner = BudgetWindowBatchRunner(context) return runner.run() diff --git a/projects/policyengine-api-simulation/src/modal/budget_window_scheduler.py b/projects/policyengine-api-simulation/src/modal/budget_window_scheduler.py index 1a20887e0..81e1da93b 100644 --- a/projects/policyengine-api-simulation/src/modal/budget_window_scheduler.py +++ b/projects/policyengine-api-simulation/src/modal/budget_window_scheduler.py @@ -6,6 +6,7 @@ from typing import Any import modal +from policyengine_observability import segment, set_attribute from src.modal.budget_window_context import ( BudgetWindowBatchContext, @@ -30,6 +31,7 @@ put_batch_job_state, ) from src.modal.gateway.errors import log_and_redact_exception +from policyengine_api_simulation.observability import SegmentName # Polling tuning. The runner busy-loops across child FunctionCall.get(timeout=0) # probes; when no child resolved we sleep before the next probe to stop the @@ -49,11 +51,13 @@ def serialize_batch_status(state) -> dict[str, Any]: - return build_batch_status_response(state).model_dump(mode="json") + with segment(SegmentName.BUDGET_WINDOW_STATUS_SERIALIZATION): + return build_batch_status_response(state).model_dump(mode="json") def load_or_create_batch_state(context: BudgetWindowBatchContext): - state = get_batch_job_seed(context.batch_job_id) + with segment(SegmentName.BUDGET_WINDOW_STATE_LOAD): + state = get_batch_job_seed(context.batch_job_id) if state is None: state = create_initial_batch_state( batch_job_id=context.batch_job_id, @@ -62,7 +66,8 @@ def load_or_create_batch_state(context: BudgetWindowBatchContext): resolved_app_name=context.resolved_app_name, bundle=context.bundle, ) - put_batch_job_seed(state) + with segment(SegmentName.BUDGET_WINDOW_STATE_WRITE): + put_batch_job_seed(state) return state @@ -85,16 +90,32 @@ def __init__( self.poll_interval_backoff_factor = poll_interval_backoff_factor # Kept for tests that still read this attribute. self.poll_interval_seconds = poll_interval_seconds + set_attribute("batch_job_id", context.batch_job_id) + set_attribute("resolved_app_name", context.resolved_app_name) + set_attribute("resolved_version", context.resolved_version) self.state = load_or_create_batch_state(context) + # ``Function.from_name`` is a lazy handle (no RPC); hydration happens + # on first spawn, which BUDGET_WINDOW_CHILD_SPAWN already times. self.child_func = self.modal.Function.from_name( context.resolved_app_name, "run_simulation", ) self.child_handles: dict[str, ChildSimulationHandle] = {} + # Poll/sleep totals are tracked as running aggregates instead of one + # segment per loop iteration: a segment appends a node to the + # operation's in-memory segment tree, and a near-timeout batch can + # poll thousands of times — unbounded memory plus a final operation + # log line large enough for Modal's log pipeline to truncate. + # ``set_attribute`` overwrites, so publishing these stays bounded. + self._child_poll_seconds = 0.0 + self._child_poll_count = 0 + self._backoff_sleep_seconds = 0.0 + self._backoff_sleep_count = 0 def run(self) -> dict[str, Any]: mark_batch_running(self.state) - put_batch_job_state(self.state) + with segment(SegmentName.BUDGET_WINDOW_STATE_WRITE): + put_batch_job_state(self.state) # Exponential backoff: reset on any progress, double on empty polls. current_sleep = self.poll_interval_initial_seconds @@ -106,6 +127,9 @@ def run(self) -> dict[str, Any]: return serialize_batch_status(self.state) if self.state.running_years and not progress_made: time.sleep(current_sleep) + self._backoff_sleep_seconds += current_sleep + self._backoff_sleep_count += 1 + self._publish_poll_stats() current_sleep = min( current_sleep * self.poll_interval_backoff_factor, self.poll_interval_max_seconds, @@ -124,11 +148,19 @@ def spawn_until_capacity(self) -> None: and self.state.queued_years ): simulation_year = self.state.queued_years[0] - child_request = build_child_simulation_request( - self.context, + with segment( + SegmentName.BUDGET_WINDOW_CHILD_REQUEST_BUILD, simulation_year=simulation_year, - ) - call = self.child_func.spawn(child_request.payload) + ): + child_request = build_child_simulation_request( + self.context, + simulation_year=simulation_year, + ) + with segment( + SegmentName.BUDGET_WINDOW_CHILD_SPAWN, + simulation_year=simulation_year, + ): + call = self.child_func.spawn(child_request.payload) self.child_handles[simulation_year] = ChildSimulationHandle( simulation_year=simulation_year, job_id=call.object_id, @@ -139,7 +171,8 @@ def spawn_until_capacity(self) -> None: year=simulation_year, child_job_id=call.object_id, ) - put_batch_job_state(self.state) + with segment(SegmentName.BUDGET_WINDOW_STATE_WRITE): + put_batch_job_state(self.state) def poll_running_children_once(self) -> bool: progress_made = False @@ -147,6 +180,7 @@ def poll_running_children_once(self) -> bool: for simulation_year in list(self.state.running_years): handle = self.resolve_child_handle(simulation_year) + poll_started = time.perf_counter() try: child_result = handle.call.get(timeout=0) except TimeoutError: @@ -165,12 +199,20 @@ def poll_running_children_once(self) -> bool: error=redacted, ) return False + finally: + self._child_poll_seconds += time.perf_counter() - poll_started + self._child_poll_count += 1 + self._publish_poll_stats() try: - annual_impact = extract_annual_impact( + with segment( + SegmentName.BUDGET_WINDOW_RESULT_PARSE, simulation_year=simulation_year, - child_result=child_result, - ) + ): + annual_impact = extract_annual_impact( + simulation_year=simulation_year, + child_result=child_result, + ) except Exception as exc: redacted = log_and_redact_exception( exc, @@ -191,7 +233,8 @@ def poll_running_children_once(self) -> bool: year=simulation_year, annual_impact=annual_impact, ) - put_batch_job_state(self.state) + with segment(SegmentName.BUDGET_WINDOW_STATE_WRITE): + put_batch_job_state(self.state) progress_made = True return progress_made @@ -202,6 +245,9 @@ def resolve_child_handle(self, simulation_year: str) -> ChildSimulationHandle: return handle job_id = self.state.child_jobs[simulation_year].job_id + # ``FunctionCall.from_id`` is a lazy handle (no RPC), so there is + # nothing meaningful to time here; the network cost lands in the + # subsequent ``call.get`` poll. call = self.modal.FunctionCall.from_id(job_id) resolved_handle = ChildSimulationHandle( simulation_year=simulation_year, @@ -211,6 +257,18 @@ def resolve_child_handle(self, simulation_year: str) -> ChildSimulationHandle: self.child_handles[simulation_year] = resolved_handle return resolved_handle + def _publish_poll_stats(self) -> None: + set_attribute("child_poll_count", self._child_poll_count) + set_attribute( + "child_poll_ms_total", + round(self._child_poll_seconds * 1000, 1), + ) + set_attribute("backoff_sleep_count", self._backoff_sleep_count) + set_attribute( + "backoff_sleep_ms_total", + round(self._backoff_sleep_seconds * 1000, 1), + ) + def fail_batch_for_child_error( self, *, @@ -219,19 +277,22 @@ def fail_batch_for_child_error( ) -> None: mark_child_failed(self.state, year=simulation_year, error=error) mark_batch_failed(self.state, error=error) - put_batch_job_state(self.state) + with segment(SegmentName.BUDGET_WINDOW_STATE_WRITE): + put_batch_job_state(self.state) def complete_batch(self) -> dict[str, Any]: - annual_impacts = [ - self.state.partial_annual_impacts[simulation_year] - for simulation_year in self.state.years - if simulation_year in self.state.partial_annual_impacts - ] - result = build_budget_window_result( - start_year=self.state.start_year, - window_size=self.state.window_size, - annual_impacts=annual_impacts, - ) + with segment(SegmentName.BUDGET_WINDOW_AGGREGATION): + annual_impacts = [ + self.state.partial_annual_impacts[simulation_year] + for simulation_year in self.state.years + if simulation_year in self.state.partial_annual_impacts + ] + result = build_budget_window_result( + start_year=self.state.start_year, + window_size=self.state.window_size, + annual_impacts=annual_impacts, + ) mark_batch_complete(self.state, result=result) - put_batch_job_state(self.state) + with segment(SegmentName.BUDGET_WINDOW_STATE_WRITE): + put_batch_job_state(self.state) return serialize_batch_status(self.state) diff --git a/projects/policyengine-api-simulation/src/modal/gateway/app.py b/projects/policyengine-api-simulation/src/modal/gateway/app.py index ec57b5dff..e2a48825d 100644 --- a/projects/policyengine-api-simulation/src/modal/gateway/app.py +++ b/projects/policyengine-api-simulation/src/modal/gateway/app.py @@ -10,9 +10,12 @@ import modal +from src.modal.logfire_legacy import configure_logfire + # Stable app name - this should rarely change app = modal.App("policyengine-simulation-gateway") gateway_auth_secret = modal.Secret.from_name("policyengine-gateway-auth") +logfire_secret = modal.Secret.from_name("policyengine-logfire") # Lightweight image for gateway - no heavy dependencies gateway_image = ( @@ -25,6 +28,8 @@ # JWTDecoder lives in the policyengine-fastapi lib; it only needs # the auth module at runtime here. "cryptography>=41.0.0", + "logfire>=3.0.0", + "policyengine-observability[fastapi]>=1.3.0,<2", ) .add_local_python_source( "src.modal", @@ -35,7 +40,7 @@ ) -@app.function(image=gateway_image, secrets=[gateway_auth_secret]) +@app.function(image=gateway_image, secrets=[gateway_auth_secret, logfire_secret]) @modal.asgi_app() def web_app(): """ @@ -49,12 +54,30 @@ def web_app(): """ from fastapi import FastAPI + from policyengine_api_simulation.observability import ( + configure_process_observability, + init_simulation_observability, + ) from src.modal.gateway.auth import ( enforce_auth_configured_guard, enforce_production_auth_guard, ) from src.modal.gateway.endpoints import router + api = FastAPI( + title="PolicyEngine Simulation Gateway", + description="Submit and poll simulation jobs. Routes to versioned simulation apps.", + version="1.0.0", + ) + configure_process_observability( + platform="modal", + service_role="modal_gateway", + modal_app_name="policyengine-simulation-gateway", + modal_function_name="web_app", + ) + init_simulation_observability(api, service_role="modal_gateway") + configure_logfire("policyengine-simulation-gateway") + # Startup guard: crash the container if GATEWAY_AUTH_DISABLED is set in # a production-equivalent Modal environment, or set without the # explicit acknowledgement env var. This prevents the bypass from @@ -63,10 +86,5 @@ def web_app(): enforce_production_auth_guard() enforce_auth_configured_guard() - api = FastAPI( - title="PolicyEngine Simulation Gateway", - description="Submit and poll simulation jobs. Routes to versioned simulation apps.", - version="1.0.0", - ) api.include_router(router) return api diff --git a/projects/policyengine-api-simulation/src/modal/gateway/auth.py b/projects/policyengine-api-simulation/src/modal/gateway/auth.py index 02e433474..d889ea057 100644 --- a/projects/policyengine-api-simulation/src/modal/gateway/auth.py +++ b/projects/policyengine-api-simulation/src/modal/gateway/auth.py @@ -34,7 +34,12 @@ from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from policyengine_observability import record_event from policyengine_fastapi.auth import JWTDecoder +from src.modal.logfire_legacy import ( + legacy_logfire_attributes, + logfire_is_configured, +) logger = logging.getLogger(__name__) @@ -142,9 +147,9 @@ def enforce_production_auth_guard() -> None: the bypass impossible to set via one stray env var — an operator must actively opt in. - Even when the guard passes, emit a ``CRITICAL`` log (and, if available, - a ``logfire.error``) so any audit of the service's logs surfaces the - bypass immediately. + Even when the guard passes, emit a ``CRITICAL`` log, a structured + observability event, and a legacy Logfire event so any audit of the + service's logs surfaces the bypass immediately. """ if not _auth_disabled(): return @@ -175,13 +180,24 @@ def enforce_production_auth_guard() -> None: ) logger.critical(banner) try: - import logfire # Local import: logfire is optional in tests. - - logfire.error( + record_event( "gateway_auth_disabled_bypass_active", modal_environment=modal_env, ack_value_present=True, + **legacy_logfire_attributes(), ) + except Exception: # pragma: no cover - observability must never block startup + pass + try: + if logfire_is_configured(): + import logfire + + logfire.error( + "gateway_auth_disabled_bypass_active", + modal_environment=modal_env, + ack_value_present=True, + **legacy_logfire_attributes(), + ) except Exception: # pragma: no cover - logfire optional / misconfigured pass diff --git a/projects/policyengine-api-simulation/src/modal/gateway/endpoints.py b/projects/policyengine-api-simulation/src/modal/gateway/endpoints.py index c9fc357a1..8a70be3d3 100644 --- a/projects/policyengine-api-simulation/src/modal/gateway/endpoints.py +++ b/projects/policyengine-api-simulation/src/modal/gateway/endpoints.py @@ -8,6 +8,12 @@ import modal from fastapi import APIRouter, Depends, HTTPException +from policyengine_observability import ( + record_error, + record_event, + segment, + set_attribute, +) from src.modal.budget_window_state import ( build_batch_status_response, @@ -42,6 +48,7 @@ from policyengine_api_simulation.hf_dataset import ( HuggingFaceDatasetReferenceError, ) +from policyengine_api_simulation.observability import SegmentName logger = logging.getLogger(__name__) @@ -65,6 +72,15 @@ def _job_metadata_store(): return modal.Dict.from_name(JOB_METADATA_DICT_NAME, create_if_missing=True) +def _record_not_found(message: str) -> None: + record_error( + LookupError(message), + handled=True, + status_code=404, + include_stack=False, + ) + + def _split_requested_revision(requested_data: str) -> tuple[str, str | None]: if "@" not in requested_data: return requested_data, None @@ -626,31 +642,43 @@ async def submit_simulation(request: SimulationRequest): Routes to the appropriate app based on country and version params. Returns immediately with job_id for polling. """ + set_attribute("country", request.country) + set_attribute("scope", request.scope) + set_attribute("run_id", request.telemetry.run_id if request.telemetry else None) try: - route = resolve_route( - request.country, - request.version, - request.policyengine_version, - ) + with segment(SegmentName.ROUTE_RESOLUTION): + route = resolve_route( + request.country, + request.version, + request.policyengine_version, + ) except ValueError as e: + record_error(e, handled=True, status_code=400, include_stack=False) raise HTTPException(status_code=400, detail=str(e)) - payload = request.model_dump( - exclude={"version", "policyengine_version", "telemetry"}, - mode="json", - exclude_none=True, - ) + set_attribute("resolved_app_name", route.app_name) + set_attribute("resolved_version", route.response_version) + set_attribute("policyengine_version", route.policyengine_version) + + with segment(SegmentName.REQUEST_PARSE): + payload = request.model_dump( + exclude={"version", "policyengine_version", "telemetry"}, + mode="json", + exclude_none=True, + ) run_id = request.telemetry.run_id if request.telemetry else None if request.telemetry is not None: payload["_telemetry"] = request.telemetry.model_dump(mode="json") try: - bundle = _build_policyengine_bundle( - request.country, - route, - payload, - ) + with segment(SegmentName.POLICYENGINE_BUNDLE): + bundle = _build_policyengine_bundle( + request.country, + route, + payload, + ) except (ValueError, HuggingFaceDatasetReferenceError) as exc: + record_error(exc, handled=True, status_code=400, include_stack=False) raise HTTPException(status_code=400, detail=str(exc)) from exc logger.info( @@ -661,25 +689,30 @@ async def submit_simulation(request: SimulationRequest): run_id, ) - # Get function reference from the target app - sim_func = modal.Function.from_name(route.app_name, "run_simulation") + # Spawn the job (returns immediately). ``Function.from_name`` is a lazy + # handle — the control-plane RPC (hydration) happens inside ``spawn`` — + # so both live under the spawn segment to time the real network cost. + with segment(SegmentName.MODAL_FUNCTION_SPAWN): + sim_func = modal.Function.from_name(route.app_name, "run_simulation") + call = sim_func.spawn(payload) - # Spawn the job (returns immediately) - call = sim_func.spawn(payload) + set_attribute("job_id", call.object_id) job_metadata = _serialize_job_metadata(route.app_name, bundle, run_id) - _job_metadata_store()[call.object_id] = job_metadata - - return JobSubmitResponse( - job_id=call.object_id, - status="submitted", - poll_url=f"/jobs/{call.object_id}", - country=request.country, - version=route.response_version, - resolved_app_name=route.app_name, - policyengine_bundle=bundle, - run_id=run_id, - ) + with segment(SegmentName.MODAL_JOB_METADATA_WRITE): + _job_metadata_store()[call.object_id] = job_metadata + + with segment(SegmentName.RESPONSE_SERIALIZATION): + return JobSubmitResponse( + job_id=call.object_id, + status="submitted", + poll_url=f"/jobs/{call.object_id}", + country=request.country, + version=route.response_version, + resolved_app_name=route.app_name, + policyengine_bundle=bundle, + run_id=run_id, + ) @router.post( @@ -692,33 +725,47 @@ async def submit_budget_window_batch(request: BudgetWindowBatchRequest): """ Submit a budget-window batch job. """ + set_attribute("country", request.country) + set_attribute("run_id", request.telemetry.run_id if request.telemetry else None) try: - route = resolve_route( - request.country, - request.version, - request.policyengine_version, - ) + with segment(SegmentName.ROUTE_RESOLUTION): + route = resolve_route( + request.country, + request.version, + request.policyengine_version, + ) except ValueError as e: + record_error(e, handled=True, status_code=400, include_stack=False) raise HTTPException(status_code=400, detail=str(e)) + set_attribute("resolved_app_name", route.app_name) + set_attribute("resolved_version", route.response_version) + set_attribute("policyengine_version", route.policyengine_version) + try: - bundle = _build_policyengine_bundle( - request.country, - route, - request.model_dump(mode="json"), - ) + with segment(SegmentName.POLICYENGINE_BUNDLE): + bundle = _build_policyengine_bundle( + request.country, + route, + request.model_dump(mode="json"), + ) except (ValueError, HuggingFaceDatasetReferenceError) as exc: + record_error(exc, handled=True, status_code=400, include_stack=False) raise HTTPException(status_code=400, detail=str(exc)) from exc - payload = _build_budget_window_parent_payload( - request, - resolved_version=route.response_version, - resolved_app_name=route.app_name, - bundle=bundle, - ) + with segment(SegmentName.REQUEST_PARSE): + payload = _build_budget_window_parent_payload( + request, + resolved_version=route.response_version, + resolved_app_name=route.app_name, + bundle=bundle, + ) - batch_func = modal.Function.from_name(route.app_name, "run_budget_window_batch") - call = batch_func.spawn(payload) + # Lazy handle + spawn together: the RPC cost lands in ``spawn``. + with segment(SegmentName.MODAL_FUNCTION_SPAWN): + batch_func = modal.Function.from_name(route.app_name, "run_budget_window_batch") + call = batch_func.spawn(payload) batch_job_id = call.object_id + set_attribute("batch_job_id", batch_job_id) seed_state = create_initial_batch_state( batch_job_id=batch_job_id, @@ -727,18 +774,20 @@ async def submit_budget_window_batch(request: BudgetWindowBatchRequest): resolved_app_name=route.app_name, bundle=bundle, ) - put_batch_job_seed(seed_state) + with segment(SegmentName.MODAL_JOB_METADATA_WRITE): + put_batch_job_seed(seed_state) - return BudgetWindowBatchSubmitResponse( - batch_job_id=batch_job_id, - status=seed_state.status, - poll_url=f"/budget-window-jobs/{batch_job_id}", - country=request.country, - version=route.response_version, - resolved_app_name=route.app_name, - policyengine_bundle=bundle, - run_id=seed_state.run_id, - ) + with segment(SegmentName.RESPONSE_SERIALIZATION): + return BudgetWindowBatchSubmitResponse( + batch_job_id=batch_job_id, + status=seed_state.status, + poll_url=f"/budget-window-jobs/{batch_job_id}", + country=request.country, + version=route.response_version, + resolved_app_name=route.app_name, + policyengine_bundle=bundle, + run_id=seed_state.run_id, + ) @router.get( @@ -757,33 +806,44 @@ async def get_job_status(job_id: str): - 500 with status="failed" and error on failure - 404 if job_id not found """ - job_metadata = _job_metadata_store().get(job_id) + set_attribute("job_id", job_id) + with segment(SegmentName.MODAL_JOB_METADATA_READ): + job_metadata = _job_metadata_store().get(job_id) if job_metadata is None: + _record_not_found(f"Job not found: {job_id}") raise HTTPException(status_code=404, detail=f"Job not found: {job_id}") try: - call = modal.FunctionCall.from_id(job_id) + with segment(SegmentName.MODAL_JOB_STATUS_POLL): + call = modal.FunctionCall.from_id(job_id) except Exception as exc: if _is_modal_job_not_found(exc): + record_error(exc, handled=True, status_code=404, include_stack=False) raise HTTPException(status_code=404, detail=f"Job not found: {job_id}") + record_error(exc, handled=False, status_code=500) raise try: - result = call.get(timeout=0) - return JobStatusResponse( - status="complete", result=result, **(job_metadata or {}) - ) + with segment(SegmentName.MODAL_JOB_STATUS_POLL): + result = call.get(timeout=0) + with segment(SegmentName.RESPONSE_SERIALIZATION): + return JobStatusResponse( + status="complete", result=result, **(job_metadata or {}) + ) except TimeoutError: - return running_job_response(job_metadata) + with segment(SegmentName.RESPONSE_SERIALIZATION): + return running_job_response(job_metadata) except Exception as exc: if _is_modal_job_not_found(exc): + record_error(exc, handled=True, status_code=404, include_stack=False) raise HTTPException(status_code=404, detail=f"Job not found: {job_id}") redacted = log_and_redact_exception( exc, scope="simulation_job_status", context={"job_id": job_id}, ) - return failed_job_response(error=redacted, job_metadata=job_metadata) + with segment(SegmentName.RESPONSE_SERIALIZATION): + return failed_job_response(error=redacted, job_metadata=job_metadata) @router.get( @@ -796,25 +856,50 @@ async def get_budget_window_job_status(batch_job_id: str): """ Poll for budget-window batch status. """ - state = get_batch_job_state(batch_job_id) + set_attribute("batch_job_id", batch_job_id) + with segment(SegmentName.BUDGET_WINDOW_STATE_LOAD): + state = get_batch_job_state(batch_job_id) if state is not None: - return batch_status_response(build_batch_status_response(state)) + with segment(SegmentName.BUDGET_WINDOW_STATUS_SERIALIZATION): + return batch_status_response(build_batch_status_response(state)) - seed_state = get_batch_job_seed(batch_job_id) + with segment(SegmentName.BUDGET_WINDOW_STATE_LOAD): + seed_state = get_batch_job_seed(batch_job_id) if seed_state is None: + _record_not_found(f"Budget-window job not found: {batch_job_id}") raise HTTPException( status_code=404, detail=f"Budget-window job not found: {batch_job_id}" ) try: - call = modal.FunctionCall.from_id(batch_job_id) - except Exception: - return batch_status_response(build_batch_status_response(seed_state)) + with segment(SegmentName.MODAL_JOB_STATUS_POLL): + call = modal.FunctionCall.from_id(batch_job_id) + except Exception as exc: + # The endpoint degrades gracefully to the seed state (a successful + # 202/200 response), so this must NOT be recorded as a request + # error — record_error would emit an http_request_failed event and + # an error metric contradicting the status the client actually saw. + record_event( + "budget_window_parent_lookup_degraded", + batch_job_id=batch_job_id, + error_type=type(exc).__name__, + parent_call_not_found=_is_modal_job_not_found(exc), + ) + logger.warning( + "Budget-window parent FunctionCall lookup failed; " + "serving seed state (batch_job_id=%s)", + batch_job_id, + exc_info=exc, + ) + with segment(SegmentName.BUDGET_WINDOW_STATUS_SERIALIZATION): + return batch_status_response(build_batch_status_response(seed_state)) try: - result = call.get(timeout=0) + with segment(SegmentName.MODAL_JOB_STATUS_POLL): + result = call.get(timeout=0) except TimeoutError: - return batch_status_response(build_batch_status_response(seed_state)) + with segment(SegmentName.BUDGET_WINDOW_STATUS_SERIALIZATION): + return batch_status_response(build_batch_status_response(seed_state)) except Exception as exc: # Persist the failure so subsequent polls don't resurrect the # "submitted" status from the seed store (#448). We deliberately @@ -827,33 +912,41 @@ async def get_budget_window_job_status(batch_job_id: str): ) seed_state.status = "failed" seed_state.error = redacted - put_batch_job_state(seed_state) - put_batch_job_seed(seed_state) - return batch_status_response(build_batch_status_response(seed_state)) + with segment(SegmentName.BUDGET_WINDOW_STATE_WRITE): + put_batch_job_state(seed_state) + put_batch_job_seed(seed_state) + with segment(SegmentName.BUDGET_WINDOW_STATUS_SERIALIZATION): + return batch_status_response(build_batch_status_response(seed_state)) - response = BudgetWindowBatchStatusResponse.model_validate(result) - return batch_status_response(response) + with segment(SegmentName.BUDGET_WINDOW_RESULT_PARSE): + response = BudgetWindowBatchStatusResponse.model_validate(result) + with segment(SegmentName.BUDGET_WINDOW_STATUS_SERIALIZATION): + return batch_status_response(response) @router.get("/versions") async def list_versions(): """List all available routing versions.""" - state = _active_routing_state() + with segment(SegmentName.ROUTE_RESOLUTION): + state = _active_routing_state() if state: return { kind: _version_map_from_state(state, kind) for kind in SUPPORTED_ROUTE_KINDS } - policyengine_dict = _optional_modal_dict(POLICYENGINE_VERSION_DICT_NAME) - us_dict = modal.Dict.from_name("simulation-api-us-versions") - uk_dict = modal.Dict.from_name("simulation-api-uk-versions") - return { - "policyengine": ( - dict(policyengine_dict) if policyengine_dict is not None else {} - ), - "us": dict(us_dict), - "uk": dict(uk_dict), - } + # ``Dict.from_name`` is a lazy handle; the RPCs fire during the + # ``dict(...)`` iterations, so those are what the segment must cover. + with segment(SegmentName.MODAL_DICT_READ): + policyengine_dict = _optional_modal_dict(POLICYENGINE_VERSION_DICT_NAME) + us_dict = modal.Dict.from_name("simulation-api-us-versions") + uk_dict = modal.Dict.from_name("simulation-api-uk-versions") + return { + "policyengine": ( + dict(policyengine_dict) if policyengine_dict is not None else {} + ), + "us": dict(us_dict), + "uk": dict(uk_dict), + } def _version_map_from_state(state: dict, kind: str) -> dict: @@ -869,18 +962,24 @@ async def get_country_versions(kind: str): """Get available versions for policyengine, US, or UK routing.""" kind_lower = kind.lower() if kind_lower not in SUPPORTED_ROUTE_KINDS: + _record_not_found(f"Unknown version kind: {kind}") raise HTTPException(status_code=404, detail=f"Unknown version kind: {kind}") - state = _active_routing_state() + with segment(SegmentName.ROUTE_RESOLUTION): + state = _active_routing_state() if state: return _version_map_from_state(state, kind_lower) + # The ``dict(...)`` iteration is where the Modal Dict RPCs happen; the + # from_name handle itself is lazy and free. if kind_lower == "policyengine": - version_dict = _optional_modal_dict(POLICYENGINE_VERSION_DICT_NAME) - return dict(version_dict) if version_dict is not None else {} + with segment(SegmentName.MODAL_DICT_READ): + version_dict = _optional_modal_dict(POLICYENGINE_VERSION_DICT_NAME) + return dict(version_dict) if version_dict is not None else {} - version_dict = modal.Dict.from_name(f"simulation-api-{kind_lower}-versions") - return dict(version_dict) + with segment(SegmentName.MODAL_DICT_READ): + version_dict = modal.Dict.from_name(f"simulation-api-{kind_lower}-versions") + return dict(version_dict) @router.get("/health") diff --git a/projects/policyengine-api-simulation/src/modal/gateway/errors.py b/projects/policyengine-api-simulation/src/modal/gateway/errors.py index dd3091360..c992ce04e 100644 --- a/projects/policyengine-api-simulation/src/modal/gateway/errors.py +++ b/projects/policyengine-api-simulation/src/modal/gateway/errors.py @@ -3,11 +3,16 @@ We intentionally do **not** leak ``str(exc)`` to API callers. Worker-side exceptions routinely embed container paths, HF URLs with pre-signed tokens, and internal parameter names that we do not want to surface to the public -internet. Instead, we record the full exception server-side via -:func:`logfire.exception` (or the configured structured logger) and return -the caller a stable generic message plus a correlation id they can cite to +internet. Instead, we record the full exception server-side, then return the +caller a stable generic message plus a correlation id they can cite to support. +The stdlib logger is the guaranteed server-side sink: it always fires, so the +correlation id in the caller-facing message always maps to a log line with the +full message and stack trace. ``policyengine-observability`` and legacy +Logfire (kept while we evaluate replacing it) are best-effort on top — both +silently no-op when their runtime is disabled or unconfigured. + Two helpers live here: - :func:`log_and_redact_exception` for request-handler code paths that want @@ -22,6 +27,12 @@ import uuid from typing import Any +from policyengine_observability import record_error, record_event +from src.modal.logfire_legacy import ( + legacy_logfire_attributes, + logfire_is_configured, +) + logger = logging.getLogger(__name__) @@ -31,20 +42,6 @@ _logfire = None -def _logfire_is_configured() -> bool: - if _logfire is None: - return False - # logfire exposes a module-global "configured" flag only in newer - # versions; fall back to the private ``DEFAULT_LOGFIRE_INSTANCE``. - try: - instance = getattr(_logfire, "DEFAULT_LOGFIRE_INSTANCE", None) - if instance is None: - return False - return bool(getattr(instance.config, "send_to_logfire", False)) - except Exception: - return False - - GENERIC_JOB_FAILURE_MESSAGE = "Simulation failed" @@ -71,31 +68,48 @@ def log_and_redact_exception( payload = { "correlation_id": correlation_id, "scope": scope, + "error_type": type(exc).__name__, + **legacy_logfire_attributes(), } if context: payload.update(context) - if _logfire_is_configured(): - try: - _logfire.exception( # type: ignore[union-attr] - "Gateway {scope} failed", - scope=scope, - correlation_id=correlation_id, - **(context or {}), - ) - except Exception: # pragma: no cover - defensive, never raise from logger - logger.exception( - "Gateway %s failed (correlation_id=%s)", - scope, - correlation_id, - extra=payload, - ) - else: - logger.exception( + # Guaranteed sink. record_error/record_event silently no-op when the + # observability runtime is disabled or unconfigured (they never raise), + # so this stdlib line is what makes the correlation id in the caller's + # message always resolvable to a full message + stack server-side. + try: + logger.error( "Gateway %s failed (correlation_id=%s)", scope, correlation_id, + exc_info=exc, extra=payload, ) + except Exception: # pragma: no cover - context key clashed with LogRecord + logger.error( + "Gateway %s failed (correlation_id=%s)", + scope, + correlation_id, + exc_info=exc, + ) + + try: + record_error(exc, handled=True, status_code=500) + record_event( + "gateway_error_redacted", + **payload, + ) + except Exception: # pragma: no cover - defensive, never raise from logger + pass + + if logfire_is_configured(): + try: + _logfire.exception( # type: ignore[union-attr] + "Gateway {scope} failed", + **payload, + ) + except Exception: # pragma: no cover - defensive, never raise from logger + pass return f"{GENERIC_JOB_FAILURE_MESSAGE} (correlation_id={correlation_id})" diff --git a/projects/policyengine-api-simulation/src/modal/logfire_legacy.py b/projects/policyengine-api-simulation/src/modal/logfire_legacy.py new file mode 100644 index 000000000..592b9ef84 --- /dev/null +++ b/projects/policyengine-api-simulation/src/modal/logfire_legacy.py @@ -0,0 +1,75 @@ +"""Legacy Logfire helpers used while evaluating a replacement platform.""" + +from __future__ import annotations + +import os +from contextlib import nullcontext +from typing import Any + +from policyengine_api_simulation.observability import ( + logfire_replacement_attributes, +) + + +# Whether configure_logfire successfully configured export in this process. +# Logfire's own ``config.send_to_logfire`` flag defaults to ``True`` on an +# UNCONFIGURED instance, so it cannot be used to answer "did we configure +# Logfire?" — callers must consult this flag instead. +_logfire_configured = False + + +def legacy_logfire_attributes() -> dict[str, str]: + return logfire_replacement_attributes() + + +def logfire_is_configured() -> bool: + """Whether configure_logfire ran with a token in this process.""" + return _logfire_configured + + +def configure_logfire( + service_name: str, + *, + default_environment: str | None = None, +) -> bool: + """Configure legacy Logfire export if the Modal secret is present.""" + global _logfire_configured + + token = os.environ.get("LOGFIRE_TOKEN", "") + if not token: + _logfire_configured = False + return False + + import logfire + + logfire.configure( + service_name=service_name, + token=token, + environment=( + os.environ.get("LOGFIRE_ENVIRONMENT") + or default_environment + or os.environ.get("MODAL_ENVIRONMENT") + or "production" + ), + console=False, + ) + _logfire_configured = True + return True + + +def logfire_span(enabled: bool, name: str, **attrs: Any): + if not enabled: + return nullcontext() + + import logfire + + return logfire.span(name, **attrs) + + +def flush_logfire(enabled: bool) -> None: + if not enabled: + return + + import logfire + + logfire.force_flush() diff --git a/projects/policyengine-api-simulation/src/modal/logging_redaction.py b/projects/policyengine-api-simulation/src/modal/logging_redaction.py index 1a62a8e16..9e9ce7d80 100644 --- a/projects/policyengine-api-simulation/src/modal/logging_redaction.py +++ b/projects/policyengine-api-simulation/src/modal/logging_redaction.py @@ -1,4 +1,4 @@ -"""Pure-Python payload redaction helpers for Logfire spans. +"""Payload redaction helpers for structured logs and legacy Logfire spans. We keep these in a separate module from :mod:`src.modal.app` so they can be unit-tested without instantiating the Modal app (which requires runtime @@ -8,24 +8,24 @@ from __future__ import annotations -# Keys in request payloads that must never reach Logfire. ``data`` is a -# signed GCS/HF URL (may contain embedded short-lived credentials), the -# reform/baseline parameter trees are potentially large and reveal -# proprietary policy design, and ``_telemetry`` / ``_metadata`` hold -# internal routing/correlation fields we log separately via dedicated -# structured attributes. +# Keys in request payloads that must never reach observability backends. +# ``data`` is a signed GCS/HF URL (may contain embedded short-lived +# credentials), the reform/baseline parameter trees are potentially large +# and reveal proprietary policy design, and ``_telemetry`` / ``_metadata`` +# hold internal routing and correlation fields we log separately via +# dedicated structured attributes. SENSITIVE_PARAM_KEYS = ("data", "reform", "baseline", "_telemetry", "_metadata") def redact_params_for_logging(params) -> dict: - """Return a shallow copy of ``params`` safe to hand to Logfire. + """Return a shallow copy of ``params`` safe for observability attributes. We preserve routing-relevant fields (country, scope, version, time period, etc.) so operators can still trace which simulation a span corresponds to, but strip any field that may contain URLs with signed credentials or arbitrarily large user-submitted parameter trees. Non-dict inputs return an empty dict so callers can splat the result - into ``logfire.span(**...)`` without additional guards. + into operation attributes without additional guards. """ if not isinstance(params, dict): diff --git a/projects/policyengine-api-simulation/src/policyengine_api_simulation/main.py b/projects/policyengine-api-simulation/src/policyengine_api_simulation/main.py index 8606f67b2..7c3d538e1 100644 --- a/projects/policyengine-api-simulation/src/policyengine_api_simulation/main.py +++ b/projects/policyengine-api-simulation/src/policyengine_api_simulation/main.py @@ -1,19 +1,10 @@ from contextlib import asynccontextmanager from fastapi import FastAPI -from .settings import get_settings, Environment -from policyengine_fastapi.opentelemetry import ( - GCPLoggingInstrumentor, - FastAPIEnhancedInstrumenter, - export_ot_to_console, - export_ot_to_gcp, -) from policyengine_fastapi.exit import exit -from opentelemetry.sdk.resources import ( - SERVICE_NAME, - SERVICE_INSTANCE_ID, - Resource, -) from policyengine_api_simulation import initialize +from policyengine_api_simulation.observability import ( + init_simulation_observability, +) from policyengine_fastapi import ping from policyengine_fastapi.health import ( HealthRegistry, @@ -24,8 +15,8 @@ """ specific example instantiation of the app configured by a .env file * in all environments we use sqlite -* on desktop we print opentelemetry instrumentation to the console. -* in "production" we use GCP trace/metrics bindings. +* observability emits standard structured logs through policyengine-observability; + legacy Logfire export remains while we evaluate a replacement platform. """ logger = logging.getLogger(__name__) @@ -42,6 +33,7 @@ async def lifespan(app: FastAPI): title="policyengine-api-simulation", summary="Policyengine simulation api", ) +init_simulation_observability(app, service_role="api") # attach the api defined in the app package initialize(app=app) @@ -50,23 +42,3 @@ async def lifespan(app: FastAPI): health_registry = HealthRegistry() health_registry.register(HealthSystemReporter("general", {})) ping.include_all_routers(app, health_registry) - -# configure tracing and metrics -GCPLoggingInstrumentor().instrument() -FastAPIEnhancedInstrumenter().instrument(app) - -resource = Resource.create( - attributes={ - SERVICE_NAME: get_settings().ot_service_name, - SERVICE_INSTANCE_ID: get_settings().ot_service_instance_id, - } -) - -match get_settings().environment: - case Environment.DESKTOP: - pass # Don't print opentelemetry to console- this makes it impossible to read the logs. Alternatively, do by uncommenting this line. - # export_ot_to_console(resource) - case Environment.PRODUCTION: - export_ot_to_gcp(resource) - case value: - raise Exception(f"Forgot to handle environment value {value}") diff --git a/projects/policyengine-api-simulation/src/policyengine_api_simulation/observability.py b/projects/policyengine-api-simulation/src/policyengine_api_simulation/observability.py new file mode 100644 index 000000000..4a46ff01b --- /dev/null +++ b/projects/policyengine-api-simulation/src/policyengine_api_simulation/observability.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +import os +from dataclasses import replace +from enum import StrEnum +from typing import Any + +from fastapi import FastAPI +from policyengine_observability import ( + UNKNOWN_SEGMENT, + ObservabilityConfig, + ObservabilityRuntime, + set_observability_runtime, +) +from policyengine_observability.adapters.fastapi import ( + init_fastapi_observability, +) + + +SERVICE_NAME = "policyengine-api-simulation" +SPAN_PREFIX = "simulation" +LOG_DESTINATIONS = ("stdout",) +LOGFIRE_STATUS = "legacy_candidate_for_replacement" +LOGFIRE_REPLACEMENT_CANDIDATE = "policyengine-observability" +SIMULATION_METRIC_ATTRIBUTE_KEYS = ( + "batch_job_id", + "country", + "function_call_id", + "geography_code", + "geography_type", + "job_id", + "modal_app_name", + "modal_environment", + "modal_function_name", + "platform", + "policyengine_version", + "process_id", + "region", + "request_id", + "resolved_app_name", + "resolved_version", + "run_id", + "logfire_status", + "logfire_replacement_candidate", + "runtime_role", + "scope", + "simulation_year", +) + + +class SegmentName(StrEnum): + UNKNOWN = UNKNOWN_SEGMENT + + REQUEST_PARSE = "request_parse" + ROUTE_RESOLUTION = "route_resolution" + POLICYENGINE_BUNDLE = "policyengine_bundle" + # NOTE: there is deliberately no modal_function_lookup segment — + # ``Function.from_name`` is a lazy handle whose control-plane RPC fires + # inside ``spawn``, so lookup + spawn are timed together under + # MODAL_FUNCTION_SPAWN. + MODAL_FUNCTION_SPAWN = "modal_function_spawn" + MODAL_DICT_READ = "modal_dict_read" + MODAL_JOB_METADATA_WRITE = "modal_job_metadata_write" + MODAL_JOB_STATUS_POLL = "modal_job_status_poll" + + CREDENTIAL_SETUP = "credential_setup" + COUNTRY_MODULE_LOAD = "country_module_load" + REGION_RESOLUTION = "region_resolution" + DATASET_RESOLUTION = "dataset_resolution" + DATASET_LOAD = "dataset_load" + POLICY_NORMALIZATION = "policy_normalization" + SIMULATION_BUILD = "simulation_build" + CALCULATION = "calculation" + RESPONSE_SERIALIZATION = "response_serialization" + + BUDGET_WINDOW_CONTEXT = "budget_window_context" + BUDGET_WINDOW_STATE_LOAD = "budget_window_state_load" + BUDGET_WINDOW_STATE_WRITE = "budget_window_state_write" + BUDGET_WINDOW_CHILD_SPAWN = "budget_window_child_spawn" + BUDGET_WINDOW_RESULT_PARSE = "budget_window_result_parse" + BUDGET_WINDOW_AGGREGATION = "budget_window_aggregation" + BUDGET_WINDOW_CHILD_REQUEST_BUILD = "budget_window_child_request_build" + BUDGET_WINDOW_STATUS_SERIALIZATION = "budget_window_status_serialization" + # NOTE: per-iteration poll/sleep segments were removed deliberately — + # the scheduler's poll loop publishes bounded aggregate attributes + # (child_poll_count/child_poll_ms_total, backoff_sleep_count/ + # backoff_sleep_ms_total) instead, because one segment per probe grows + # the operation's segment tree without bound over a long batch. + + MODAL_JOB_METADATA_READ = "modal_job_metadata_read" + + SIMULATION_OUTPUT_BUILD = "simulation_output_build" + SIMULATION_OUTPUT_MODEL_DUMP = "simulation_output_model_dump" + ECONOMIC_IMPACT_ANALYSIS = "economic_impact_analysis" + OUTPUT_BUDGETARY_IMPACT = "output_budgetary_impact" + OUTPUT_DETAILED_BUDGET = "output_detailed_budget" + OUTPUT_DECILE = "output_decile" + OUTPUT_INEQUALITY = "output_inequality" + OUTPUT_POVERTY = "output_poverty" + OUTPUT_INTRA_DECILE = "output_intra_decile" + OUTPUT_WEALTH_DECILE = "output_wealth_decile" + OUTPUT_INTRA_WEALTH_DECILE = "output_intra_wealth_decile" + OUTPUT_LABOR_SUPPLY = "output_labor_supply" + OUTPUT_CONGRESSIONAL_DISTRICT = "output_congressional_district" + OUTPUT_UK_CONSTITUENCY = "output_uk_constituency" + OUTPUT_UK_LOCAL_AUTHORITY = "output_uk_local_authority" + OUTPUT_CLIFF = "output_cliff" + OUTPUT_MODEL_VERSION = "output_model_version" + OUTPUT_DATA_VERSION = "output_data_version" + + +def configure_process_observability( + *, + platform: str, + service_role: str, + runtime_role: str | None = None, + modal_app_name: str | None = None, + modal_function_name: str | None = None, +) -> None: + os.environ["OBSERVABILITY_PLATFORM"] = platform + os.environ["OBSERVABILITY_SERVICE_ROLE"] = service_role + os.environ["OBSERVABILITY_RUNTIME_ROLE"] = runtime_role or service_role + if modal_app_name: + os.environ["OBSERVABILITY_MODAL_APP_NAME"] = modal_app_name + if modal_function_name: + os.environ["OBSERVABILITY_MODAL_FUNCTION_NAME"] = modal_function_name + + +def init_simulation_observability( + app: FastAPI, + *, + service_role: str = "api", +) -> ObservabilityRuntime: + service_role = _service_role(service_role) + platform = _platform() + config = _config(service_role=service_role, platform=platform) + return init_fastapi_observability( + app, + config=config, + runtime=ObservabilityRuntime(config, segment_registry=SegmentName), + service_name=SERVICE_NAME, + service_role=service_role, + span_prefix=SPAN_PREFIX, + segment_registry=SegmentName, + static_attributes=_metadata(service_role, platform), + ) + + +def init_process_observability( + *, + service_role: str, +) -> ObservabilityRuntime: + service_role = _service_role(service_role) + platform = _platform() + config = _config(service_role=service_role, platform=platform) + runtime = ObservabilityRuntime(config, segment_registry=SegmentName) + runtime.configure() + set_observability_runtime(runtime) + return runtime + + +def process_static_attributes(*, service_role: str) -> dict[str, Any]: + """Static identity attributes for non-FastAPI (worker) operations. + + The FastAPI adapter applies these per request via ``static_attributes``; + plain-process runtimes have no equivalent hook, so worker entrypoints must + merge this dict into their ``operation(...)`` attributes explicitly — + otherwise operation logs carry no platform/Modal identity. + """ + return _metadata(_service_role(service_role), _platform()) + + +def logfire_replacement_attributes() -> dict[str, str]: + return { + "logfire_status": LOGFIRE_STATUS, + "logfire_replacement_candidate": LOGFIRE_REPLACEMENT_CANDIDATE, + } + + +def _config( + *, + service_role: str, + platform: str, +) -> ObservabilityConfig: + config = ObservabilityConfig.from_env( + service_name=SERVICE_NAME, + service_role=service_role, + span_prefix=SPAN_PREFIX, + extra_metric_attribute_keys=SIMULATION_METRIC_ATTRIBUTE_KEYS, + default_log_destinations=LOG_DESTINATIONS, + ) + return replace( + config, + environment=_environment(), + log_destinations=LOG_DESTINATIONS, + otel_enabled=False, + google_cloud_project=None, + ) + + +def _environment() -> str: + return ( + os.getenv("OBSERVABILITY_ENVIRONMENT") + or os.getenv("MODAL_ENVIRONMENT") + or os.getenv("DEPLOYMENT_ENVIRONMENT") + or os.getenv("APP_ENV") + or os.getenv("ENVIRONMENT") + or "local" + ) + + +def _service_role(default: str) -> str: + return ( + os.getenv("OBSERVABILITY_SERVICE_ROLE") + or os.getenv("OBSERVABILITY_RUNTIME_ROLE") + or default + ) + + +def _platform() -> str: + configured = os.getenv("OBSERVABILITY_PLATFORM") + if configured: + return configured + if os.getenv("MODAL_ENVIRONMENT") or os.getenv("MODAL_TASK_ID"): + return "modal" + return "local" + + +def _metadata(service_role: str, platform: str) -> dict[str, Any]: + values = { + "platform": platform, + "runtime_role": os.getenv("OBSERVABILITY_RUNTIME_ROLE") or service_role, + "modal_environment": os.getenv("MODAL_ENVIRONMENT"), + "modal_app_name": os.getenv("OBSERVABILITY_MODAL_APP_NAME"), + "modal_function_name": os.getenv("OBSERVABILITY_MODAL_FUNCTION_NAME"), + **logfire_replacement_attributes(), + } + return {key: value for key, value in values.items() if value} + + +set_observability_runtime(ObservabilityRuntime.disabled()) diff --git a/projects/policyengine-api-simulation/src/policyengine_api_simulation/simulation_output_builder.py b/projects/policyengine-api-simulation/src/policyengine_api_simulation/simulation_output_builder.py index f68b4ac78..5b7050444 100644 --- a/projects/policyengine-api-simulation/src/policyengine_api_simulation/simulation_output_builder.py +++ b/projects/policyengine-api-simulation/src/policyengine_api_simulation/simulation_output_builder.py @@ -5,6 +5,8 @@ from dataclasses import dataclass, field from typing import Any +from policyengine_observability import segment + from policyengine_api_simulation import simulation_output_budget from policyengine_api_simulation import simulation_output_cliff from policyengine_api_simulation import simulation_output_distribution @@ -12,6 +14,7 @@ from policyengine_api_simulation import simulation_output_inequality from policyengine_api_simulation import simulation_output_labor from policyengine_api_simulation import simulation_output_poverty +from policyengine_api_simulation.observability import SegmentName from policyengine_api_simulation.release_bundle import get_country_release_bundle from policyengine_api_simulation.simulation_macro_output import ( BudgetaryImpact, @@ -47,87 +50,107 @@ def __post_init__(self) -> None: @property def analysis(self) -> Any: if self._analysis is None: - self._analysis = self.country_module.economic_impact_analysis( - self.baseline, - self.reform, - include_cliff_impacts=self._include_cliff_impacts(), - ) + with segment(SegmentName.ECONOMIC_IMPACT_ANALYSIS): + self._analysis = self.country_module.economic_impact_analysis( + self.baseline, + self.reform, + include_cliff_impacts=self._include_cliff_impacts(), + ) return self._analysis def _include_cliff_impacts(self) -> bool: return self.simulation_params.get("include_cliffs") is True def build(self) -> SingleYearMacroOutput: - poverty_outputs = self._build_poverty_outputs() - wealth_decile = getattr(self.analysis, "wealth_decile_impacts", None) - intra_wealth_decile = getattr( - self.analysis, "intra_wealth_decile_impacts", None - ) + with segment(SegmentName.SIMULATION_OUTPUT_BUILD): + poverty_outputs = self._build_poverty_outputs() + wealth_decile = getattr(self.analysis, "wealth_decile_impacts", None) + intra_wealth_decile = getattr( + self.analysis, "intra_wealth_decile_impacts", None + ) - return SingleYearMacroOutput( - model_version=self._model_version(), - data_version=self._data_version(), - budget=self._build_budgetary_impact(), - detailed_budget=self._build_detailed_budget(), - decile=self._build_decile(), - inequality=self._build_inequality(), - poverty=poverty_outputs.poverty, - poverty_by_gender=poverty_outputs.poverty_by_gender, - poverty_by_race=poverty_outputs.poverty_by_race, - intra_decile=self._build_intra_decile_output(), - wealth_decile=self._build_wealth_decile(wealth_decile), - intra_wealth_decile=self._build_intra_wealth_decile(intra_wealth_decile), - labor_supply_response=self._build_labor_supply_response(), - congressional_district_impact=(self._build_congressional_district_impact()), - constituency_impact=self._build_uk_constituency_impact(), - local_authority_impact=self._build_uk_local_authority_impact(), - cliff_impact=self._build_cliff_impact(), - ) + return SingleYearMacroOutput( + model_version=self._model_version(), + data_version=self._data_version(), + budget=self._build_budgetary_impact(), + detailed_budget=self._build_detailed_budget(), + decile=self._build_decile(), + inequality=self._build_inequality(), + poverty=poverty_outputs.poverty, + poverty_by_gender=poverty_outputs.poverty_by_gender, + poverty_by_race=poverty_outputs.poverty_by_race, + intra_decile=self._build_intra_decile_output(), + wealth_decile=self._build_wealth_decile(wealth_decile), + intra_wealth_decile=self._build_intra_wealth_decile( + intra_wealth_decile + ), + labor_supply_response=self._build_labor_supply_response(), + congressional_district_impact=( + self._build_congressional_district_impact() + ), + constituency_impact=self._build_uk_constituency_impact(), + local_authority_impact=self._build_uk_local_authority_impact(), + cliff_impact=self._build_cliff_impact(), + ) def serialize(self) -> dict[str, Any]: - return self.build().model_dump(mode="json") + with segment(SegmentName.CALCULATION): + output = self.build() + with segment(SegmentName.RESPONSE_SERIALIZATION): + with segment(SegmentName.SIMULATION_OUTPUT_MODEL_DUMP): + return output.model_dump(mode="json") def _build_detailed_budget(self) -> DetailedBudgetOutput: - return simulation_output_budget.build_detailed_budget(self.analysis) + with segment(SegmentName.OUTPUT_DETAILED_BUDGET): + return simulation_output_budget.build_detailed_budget(self.analysis) def _build_decile(self) -> DecileOutput: - return simulation_output_distribution.build_decile(self.analysis) + with segment(SegmentName.OUTPUT_DECILE): + return simulation_output_distribution.build_decile(self.analysis) def _build_inequality(self) -> InequalityOutput: - return simulation_output_inequality.build_inequality(self.analysis) + with segment(SegmentName.OUTPUT_INEQUALITY): + return simulation_output_inequality.build_inequality(self.analysis) def _build_budgetary_impact(self) -> BudgetaryImpact: - return simulation_output_budget.build_budgetary_impact( - self.country, self.baseline, self.reform - ) + with segment(SegmentName.OUTPUT_BUDGETARY_IMPACT): + return simulation_output_budget.build_budgetary_impact( + self.country, self.baseline, self.reform + ) def _build_poverty_outputs(self) -> PovertyModuleOutputs: - return simulation_output_poverty.build_poverty_outputs( - self.country, self.baseline, self.reform, self.analysis - ) + with segment(SegmentName.OUTPUT_POVERTY): + return simulation_output_poverty.build_poverty_outputs( + self.country, self.baseline, self.reform, self.analysis + ) def _build_intra_decile_output(self) -> IntraDecileOutput: - return simulation_output_distribution.build_intra_decile_output( - self.baseline, self.reform - ) + with segment(SegmentName.OUTPUT_INTRA_DECILE): + return simulation_output_distribution.build_intra_decile_output( + self.baseline, self.reform + ) def _build_wealth_decile(self, wealth_decile: Any) -> DecileOutput | None: - return simulation_output_distribution.build_wealth_decile( - self.country, wealth_decile - ) + with segment(SegmentName.OUTPUT_WEALTH_DECILE): + return simulation_output_distribution.build_wealth_decile( + self.country, wealth_decile + ) def _build_intra_wealth_decile( self, intra_wealth_decile: Any ) -> IntraDecileOutput | None: - return simulation_output_distribution.build_intra_wealth_decile( - self.country, intra_wealth_decile - ) + with segment(SegmentName.OUTPUT_INTRA_WEALTH_DECILE): + return simulation_output_distribution.build_intra_wealth_decile( + self.country, intra_wealth_decile + ) def _build_labor_supply_response(self) -> LaborSupplyResponseOutput | None: - return simulation_output_labor.build_labor_supply_response(self.analysis) + with segment(SegmentName.OUTPUT_LABOR_SUPPLY): + return simulation_output_labor.build_labor_supply_response(self.analysis) def _build_cliff_impact(self) -> CliffImpactOutput | None: - return simulation_output_cliff.build_cliff_impact(self.analysis) + with segment(SegmentName.OUTPUT_CLIFF): + return simulation_output_cliff.build_cliff_impact(self.analysis) def _build_geographic_impact_output( self, value: Any @@ -186,40 +209,45 @@ def _build_poverty_by_race_output( def _build_congressional_district_impact( self, ) -> GeographicImpactOutput | None: - return simulation_output_geographic.build_congressional_district_impact( - self.country, self.baseline, self.reform - ) + with segment(SegmentName.OUTPUT_CONGRESSIONAL_DISTRICT): + return simulation_output_geographic.build_congressional_district_impact( + self.country, self.baseline, self.reform + ) def _build_uk_constituency_impact(self) -> GeographicImpactOutput | None: - return simulation_output_geographic.build_uk_constituency_impact( - self.country, self.baseline, self.reform - ) + with segment(SegmentName.OUTPUT_UK_CONSTITUENCY): + return simulation_output_geographic.build_uk_constituency_impact( + self.country, self.baseline, self.reform + ) def _build_uk_local_authority_impact(self) -> GeographicImpactOutput | None: - return simulation_output_geographic.build_uk_local_authority_impact( - self.country, self.baseline, self.reform - ) + with segment(SegmentName.OUTPUT_UK_LOCAL_AUTHORITY): + return simulation_output_geographic.build_uk_local_authority_impact( + self.country, self.baseline, self.reform + ) def _model_version(self) -> str: - return str(getattr(self.country_module.model, "version", "")) + with segment(SegmentName.OUTPUT_MODEL_VERSION): + return str(getattr(self.country_module.model, "version", "")) def _data_version(self) -> str: - if self.resolved_data_version: - return str(self.resolved_data_version) - data = self.simulation_params.get("data") - if isinstance(data, str) and "@" in data: - revision = data.rsplit("@", maxsplit=1)[1] - if revision: - return revision - if self.simulation_params.get("data_version"): - return str(self.simulation_params["data_version"]) - metadata = getattr(self.dataset, "metadata", {}) or {} - for key in ("data_version", "version"): - value = metadata.get(key) - if value is not None: - return str(value) - try: - return get_country_release_bundle(self.country).data_version - except ValueError: - pass - return "" + with segment(SegmentName.OUTPUT_DATA_VERSION): + if self.resolved_data_version: + return str(self.resolved_data_version) + data = self.simulation_params.get("data") + if isinstance(data, str) and "@" in data: + revision = data.rsplit("@", maxsplit=1)[1] + if revision: + return revision + if self.simulation_params.get("data_version"): + return str(self.simulation_params["data_version"]) + metadata = getattr(self.dataset, "metadata", {}) or {} + for key in ("data_version", "version"): + value = metadata.get(key) + if value is not None: + return str(value) + try: + return get_country_release_bundle(self.country).data_version + except ValueError: + pass + return "" diff --git a/projects/policyengine-api-simulation/src/policyengine_api_simulation/simulation_runtime.py b/projects/policyengine-api-simulation/src/policyengine_api_simulation/simulation_runtime.py index e5719f490..56220f61a 100644 --- a/projects/policyengine-api-simulation/src/policyengine_api_simulation/simulation_runtime.py +++ b/projects/policyengine-api-simulation/src/policyengine_api_simulation/simulation_runtime.py @@ -16,7 +16,10 @@ from importlib import import_module from typing import Any, Iterator +from policyengine_observability import segment, set_attribute + from policyengine_api_simulation.dataset_uri import runtime_dataset_uri +from policyengine_api_simulation.observability import SegmentName from policyengine_api_simulation.release_bundle import ( get_country_release_bundle, resolve_bundle_dataset_name, @@ -73,54 +76,54 @@ def setup_gcp_credentials() -> Iterator[None]: ``tempfile.mkstemp`` path leaked credential material on disk every time a container served a request. """ - # Log available GCP-related env vars for debugging - gcp_vars = { - k: v[:50] + "..." if len(v) > 50 else v - for k, v in os.environ.items() - if "GOOGLE" in k or "GCP" in k or "CREDENTIAL" in k - } - logger.info(f"GCP-related env vars: {list(gcp_vars.keys())}") - - # Check if credentials are already set as a file path - if os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): - logger.info("GOOGLE_APPLICATION_CREDENTIALS already set") - yield - return + previous = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") + creds_file = None + try: + with segment(SegmentName.CREDENTIAL_SETUP): + # Log available GCP-related env vars for debugging. + gcp_vars = { + k: v[:50] + "..." if len(v) > 50 else v + for k, v in os.environ.items() + if "GOOGLE" in k or "GCP" in k or "CREDENTIAL" in k + } + logger.info(f"GCP-related env vars: {list(gcp_vars.keys())}") - # Check for credentials JSON in various env var names - creds_json = ( - os.environ.get("GOOGLE_APPLICATION_CREDENTIALS_JSON") - or os.environ.get("GCP_CREDENTIALS_JSON") - or os.environ.get("GOOGLE_CREDENTIALS") - or os.environ.get("SERVICE_ACCOUNT_JSON") - ) + if previous: + logger.info("GOOGLE_APPLICATION_CREDENTIALS already set") + else: + creds_json = ( + os.environ.get("GOOGLE_APPLICATION_CREDENTIALS_JSON") + or os.environ.get("GCP_CREDENTIALS_JSON") + or os.environ.get("GOOGLE_CREDENTIALS") + or os.environ.get("SERVICE_ACCOUNT_JSON") + ) + + if not creds_json: + logger.warning("No GCP credentials found in environment variables") + else: + normalized = _normalize_credentials_blob(creds_json) + # ``NamedTemporaryFile(delete=True)`` removes the file when + # the context exits. We restore any prior value of + # ``GOOGLE_APPLICATION_CREDENTIALS`` so a retry in the same + # container doesn't silently pick up a stale path. + creds_file = tempfile.NamedTemporaryFile( + mode="w", + suffix=".json", + delete=True, + ) + creds_file.write(normalized) + creds_file.flush() + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_file.name + logger.info(f"GCP credentials written to {creds_file.name}") - if not creds_json: - logger.warning("No GCP credentials found in environment variables") yield - return - - normalized = _normalize_credentials_blob(creds_json) - - # ``NamedTemporaryFile(delete=True)`` removes the file when the context - # exits (either normally or via exception). We restore any prior value - # of ``GOOGLE_APPLICATION_CREDENTIALS`` so a retry in the same - # container doesn't silently pick up a path that no longer exists. - previous = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") - with tempfile.NamedTemporaryFile( - mode="w", suffix=".json", delete=True - ) as creds_file: - creds_file.write(normalized) - creds_file.flush() - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_file.name - logger.info(f"GCP credentials written to {creds_file.name}") - try: - yield - finally: + finally: + if creds_file is not None: if previous is None: os.environ.pop("GOOGLE_APPLICATION_CREDENTIALS", None) else: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = previous + creds_file.close() def run_simulation_impl(params: dict) -> dict: @@ -188,6 +191,11 @@ def _requested_data_version(params: dict[str, Any]) -> str | None: def _resolve_dataset_reference(country: str, params: dict[str, Any]) -> str: + with segment(SegmentName.DATASET_RESOLUTION): + return _resolve_dataset_reference_inner(country, params) + + +def _resolve_dataset_reference_inner(country: str, params: dict[str, Any]) -> str: requested_data = params.get("data") requested_data = requested_data if isinstance(requested_data, str) else None requested_data_version = _requested_data_version(params) @@ -424,7 +432,9 @@ def _build_simulation( def _run_simulation_impl_core(params: dict) -> dict: - simulation_params, telemetry, metadata = split_internal_payload(params) + with segment(SegmentName.REQUEST_PARSE): + simulation_params, telemetry, metadata = split_internal_payload(params) + metadata = metadata or {} logger.info( "Starting simulation for country=%s run_id=%s process_id=%s", @@ -436,36 +446,50 @@ def _run_simulation_impl_core(params: dict) -> dict: logger.info("Received simulation metadata keys: %s", sorted(metadata)) country = simulation_params.get("country", "us").lower() - country_module = _country_module(country) - region_resolution = _resolve_region( - country_module=country_module, + _set_runtime_attributes( + simulation_params=simulation_params, + telemetry=telemetry, + metadata=metadata, country=country, - params=simulation_params, ) - dataset = _load_dataset( - simulation_params, - country_module=country_module, - region_resolution=region_resolution, - ) - baseline_policy = _normalise_policy(simulation_params.get("baseline")) - reform_policy = _normalise_policy(simulation_params.get("reform")) + + with segment(SegmentName.COUNTRY_MODULE_LOAD): + country_module = _country_module(country) + with segment(SegmentName.REGION_RESOLUTION): + region_resolution = _resolve_region( + country_module=country_module, + country=country, + params=simulation_params, + ) + set_attribute("region", region_resolution.code) + with segment(SegmentName.DATASET_LOAD): + dataset = _load_dataset( + simulation_params, + country_module=country_module, + region_resolution=region_resolution, + ) + with segment(SegmentName.POLICY_NORMALIZATION): + baseline_policy = _normalise_policy(simulation_params.get("baseline")) + reform_policy = _normalise_policy(simulation_params.get("reform")) logger.info("Initialising baseline and reform simulations") - baseline = _build_simulation( - simulation_params, - dataset=dataset, - policy=baseline_policy, - scoping_strategy=region_resolution.scoping_strategy, - ) - reform = _build_simulation( - simulation_params, - dataset=dataset, - policy=reform_policy, - scoping_strategy=region_resolution.scoping_strategy, - ) + with segment(SegmentName.SIMULATION_BUILD, simulation_kind="baseline"): + baseline = _build_simulation( + simulation_params, + dataset=dataset, + policy=baseline_policy, + scoping_strategy=region_resolution.scoping_strategy, + ) + with segment(SegmentName.SIMULATION_BUILD, simulation_kind="reform"): + reform = _build_simulation( + simulation_params, + dataset=dataset, + policy=reform_policy, + scoping_strategy=region_resolution.scoping_strategy, + ) logger.info("Calculating economic impact") - output = SimulationOutputBuilder( + builder = SimulationOutputBuilder( country=country, simulation_params=simulation_params, country_module=country_module, @@ -473,6 +497,32 @@ def _run_simulation_impl_core(params: dict) -> dict: baseline=baseline, reform=reform, resolved_data_version=_requested_data_version(simulation_params), - ).serialize() + ) + output = builder.serialize() logger.info("Comparison complete") return output + + +def _set_runtime_attributes( + *, + simulation_params: dict[str, Any], + telemetry, + metadata: dict[str, Any], + country: str, +) -> None: + set_attribute("country", country) + set_attribute("scope", simulation_params.get("scope")) + set_attribute("simulation_year", _parse_year(simulation_params)) + set_attribute("run_id", getattr(telemetry, "run_id", None)) + set_attribute("process_id", getattr(telemetry, "process_id", None)) + set_attribute("request_id", getattr(telemetry, "request_id", None)) + set_attribute("geography_code", getattr(telemetry, "geography_code", None)) + set_attribute("geography_type", getattr(telemetry, "geography_type", None)) + + set_attribute("resolved_version", metadata.get("resolved_version")) + set_attribute("resolved_app_name", metadata.get("resolved_app_name")) + bundle = metadata.get("policyengine_bundle") + if isinstance(bundle, dict): + set_attribute("policyengine_version", bundle.get("policyengine_version")) + set_attribute("model_version", bundle.get("model_version")) + set_attribute("data_version", bundle.get("data_version")) diff --git a/projects/policyengine-api-simulation/tests/gateway/test_auth.py b/projects/policyengine-api-simulation/tests/gateway/test_auth.py index 4a4511edb..005009dda 100644 --- a/projects/policyengine-api-simulation/tests/gateway/test_auth.py +++ b/projects/policyengine-api-simulation/tests/gateway/test_auth.py @@ -310,6 +310,42 @@ def test__given_disabled_in_dev_with_correct_ack__then_allows_and_logs( "GATEWAY AUTH IS DISABLED" in record.message for record in caplog.records ), f"Expected critical auth-disabled banner, got {caplog.records!r}" + def test__given_bypass_active__then_logfire_event_gated_on_configuration( + self, monkeypatch + ): + """The legacy Logfire audit event fires only when configure_logfire + actually ran with a token — not based on logfire's send_to_logfire + flag, which is True even on an unconfigured instance.""" + import sys + + monkeypatch.setenv(auth_module.GATEWAY_AUTH_DISABLED_ENV, "1") + monkeypatch.setenv(auth_module.MODAL_ENVIRONMENT_ENV, "dev") + monkeypatch.setenv( + auth_module.GATEWAY_AUTH_DISABLED_ACK_ENV, + auth_module.GATEWAY_AUTH_DISABLED_ACK_VALUE, + ) + + class _FakeLogfire: + def __init__(self): + self.calls = [] + + def error(self, *args, **kwargs): + self.calls.append((args, kwargs)) + + fake_logfire = _FakeLogfire() + monkeypatch.setitem(sys.modules, "logfire", fake_logfire) + + monkeypatch.setattr(auth_module, "logfire_is_configured", lambda: False) + auth_module.enforce_production_auth_guard() + assert fake_logfire.calls == [] + + monkeypatch.setattr(auth_module, "logfire_is_configured", lambda: True) + auth_module.enforce_production_auth_guard() + assert len(fake_logfire.calls) == 1 + args, kwargs = fake_logfire.calls[0] + assert args == ("gateway_auth_disabled_bypass_active",) + assert kwargs["modal_environment"] == "dev" + class TestAuthConfiguredGuard: """Startup guard for required-or-partial auth misconfiguration.""" diff --git a/projects/policyengine-api-simulation/tests/gateway/test_endpoints.py b/projects/policyengine-api-simulation/tests/gateway/test_endpoints.py index 82b45b8c8..6e6fa23c6 100644 --- a/projects/policyengine-api-simulation/tests/gateway/test_endpoints.py +++ b/projects/policyengine-api-simulation/tests/gateway/test_endpoints.py @@ -745,17 +745,32 @@ def test__given_submitted_job_with_telemetry__then_polling_echoes_run_id( assert response.json()["run_id"] == "run-123" def test__given_unknown_job_id__then_polling_returns_404( - self, mock_modal, client: TestClient + self, mock_modal, client: TestClient, monkeypatch ): """ Given a job id that the gateway never issued When polling job status Then the gateway returns 404 before asking Modal for a call result. """ + from src.modal.gateway import endpoints as endpoints_module + + recorded_errors = [] + monkeypatch.setattr( + endpoints_module, + "record_error", + lambda exc, **kwargs: recorded_errors.append((exc, kwargs)), + ) + response = client.get("/jobs/unknown-job-id") assert response.status_code == 404 assert response.json()["detail"] == "Job not found: unknown-job-id" + assert str(recorded_errors[0][0]) == "Job not found: unknown-job-id" + assert recorded_errors[0][1] == { + "handled": True, + "status_code": 404, + "include_stack": False, + } def test__given_lazy_modal_call_without_metadata__then_polling_returns_404( self, mock_modal, client: TestClient @@ -903,6 +918,29 @@ def test__given_active_state__then_policyengine_versions_endpoint_works( assert response.status_code == 200 assert response.json()["latest"] == "4.10.0" + def test__given_unknown_version_kind__then_records_handled_404( + self, mock_modal, client: TestClient, monkeypatch + ): + from src.modal.gateway import endpoints as endpoints_module + + recorded_errors = [] + monkeypatch.setattr( + endpoints_module, + "record_error", + lambda exc, **kwargs: recorded_errors.append((exc, kwargs)), + ) + + response = client.get("/versions/fr") + + assert response.status_code == 404 + assert response.json()["detail"] == "Unknown version kind: fr" + assert str(recorded_errors[0][0]) == "Unknown version kind: fr" + assert recorded_errors[0][1] == { + "handled": True, + "status_code": 404, + "include_stack": False, + } + def test__given_no_active_state__then_versions_fall_back_to_old_dicts( self, mock_modal, client: TestClient ): @@ -978,6 +1016,95 @@ def test__given_budget_window_submission__then_returns_parent_batch_job_id( "policyengine_bundle": expected_bundle("us", "1.500.0"), } + def test__given_unknown_budget_window_job__then_records_handled_404( + self, mock_modal, client: TestClient, monkeypatch + ): + from src.modal.gateway import endpoints as endpoints_module + + recorded_errors = [] + monkeypatch.setattr( + endpoints_module, + "record_error", + lambda exc, **kwargs: recorded_errors.append((exc, kwargs)), + ) + + response = client.get("/budget-window-jobs/missing-batch") + + assert response.status_code == 404 + assert response.json()["detail"] == ( + "Budget-window job not found: missing-batch" + ) + assert str(recorded_errors[0][0]) == ( + "Budget-window job not found: missing-batch" + ) + assert recorded_errors[0][1] == { + "handled": True, + "status_code": 404, + "include_stack": False, + } + + def test__given_parent_lookup_failure__then_degraded_poll_is_not_an_error( + self, mock_modal, client: TestClient, monkeypatch + ): + """ + Given a submitted batch whose parent FunctionCall lookup fails + When polling batch status + Then the gateway serves the seed state (202) and records a degradation + event — NOT a request error, which would emit http_request_failed and + error metrics contradicting the successful response. + """ + from fixtures.gateway.test_endpoints import MockFunctionCall + from src.modal.gateway import endpoints as endpoints_module + + recorded_errors = [] + recorded_events = [] + monkeypatch.setattr( + endpoints_module, + "record_error", + lambda exc, **kwargs: recorded_errors.append((exc, kwargs)), + ) + monkeypatch.setattr( + endpoints_module, + "record_event", + lambda event, **kwargs: recorded_events.append((event, kwargs)), + ) + + mock_modal["dicts"]["simulation-api-us-versions"] = { + "latest": "1.500.0", + "1.500.0": "policyengine-simulation-py4-10-0", + } + submit_response = client.post( + "/simulate/economy/budget-window", + json={ + "country": "us", + "region": "us", + "scope": "macro", + "reform": {}, + "start_year": "2026", + "window_size": 3, + }, + ) + batch_job_id = submit_response.json()["batch_job_id"] + MockFunctionCall.from_id_errors[batch_job_id] = RuntimeError( + "transient modal control-plane error" + ) + + response = client.get(f"/budget-window-jobs/{batch_job_id}") + + assert response.status_code == 202 + assert response.json()["status"] == "submitted" + assert recorded_errors == [] + assert recorded_events == [ + ( + "budget_window_parent_lookup_degraded", + { + "batch_job_id": batch_job_id, + "error_type": "RuntimeError", + "parent_call_not_found": False, + }, + ) + ] + def test__given_budget_window_include_cliffs__then_returns_422( self, mock_modal, client: TestClient ): diff --git a/projects/policyengine-api-simulation/tests/gateway/test_errors.py b/projects/policyengine-api-simulation/tests/gateway/test_errors.py index f7d19108d..755f26eae 100644 --- a/projects/policyengine-api-simulation/tests/gateway/test_errors.py +++ b/projects/policyengine-api-simulation/tests/gateway/test_errors.py @@ -18,9 +18,21 @@ def __init__(self): def exception(self, *args, **kwargs): self.calls.append((args, kwargs)) - fake = _FakeLogfire() - monkeypatch.setattr(errors_module, "_logfire", fake) - monkeypatch.setattr(errors_module, "_logfire_is_configured", lambda: True) + recorded_errors = [] + recorded_events = [] + fake_logfire = _FakeLogfire() + monkeypatch.setattr( + errors_module, + "record_error", + lambda exc, **kwargs: recorded_errors.append((exc, kwargs)), + ) + monkeypatch.setattr( + errors_module, + "record_event", + lambda event, **kwargs: recorded_events.append((event, kwargs)), + ) + monkeypatch.setattr(errors_module, "_logfire", fake_logfire) + monkeypatch.setattr(errors_module, "logfire_is_configured", lambda: True) exc = RuntimeError( "Signed GCS URL https://storage.googleapis.com/foo?token=SECRET " @@ -37,16 +49,74 @@ def exception(self, *args, **kwargs): assert "token=" not in message assert message.startswith("Simulation failed") - assert len(fake.calls) == 1 - # Correlation id must appear in the server-side structured log. - _, kwargs = fake.calls[0] + assert recorded_errors == [ + ( + exc, + { + "handled": True, + "status_code": 500, + }, + ) + ] + assert len(recorded_events) == 1 + # Correlation id must appear in the server-side structured event. + event, kwargs = recorded_events[0] + assert event == "gateway_error_redacted" assert kwargs["correlation_id"] == match.group(1) assert kwargs["scope"] == "test_scope" assert kwargs["job_id"] == "abc" + assert kwargs["error_type"] == "RuntimeError" + assert kwargs["logfire_status"] == "legacy_candidate_for_replacement" + assert kwargs["logfire_replacement_candidate"] == "policyengine-observability" + + assert len(fake_logfire.calls) == 1 + _, logfire_kwargs = fake_logfire.calls[0] + assert logfire_kwargs["correlation_id"] == match.group(1) + assert logfire_kwargs["scope"] == "test_scope" + assert logfire_kwargs["job_id"] == "abc" + assert logfire_kwargs["logfire_status"] == "legacy_candidate_for_replacement" + assert logfire_kwargs["logfire_replacement_candidate"] == ( + "policyengine-observability" + ) + + +def test_log_and_redact_exception_always_logs_to_stdlib(monkeypatch, caplog): + """The stdlib log line must fire even when every structured sink no-ops. + + record_error/record_event silently do nothing on a disabled runtime and + Logfire is unconfigured here, so without the unconditional stdlib log the + correlation id handed to the caller would point at nothing server-side. + """ + monkeypatch.setattr(errors_module, "record_error", lambda *a, **k: None) + monkeypatch.setattr(errors_module, "record_event", lambda *a, **k: None) + monkeypatch.setattr(errors_module, "logfire_is_configured", lambda: False) + exc = ValueError("secret-parameter-name") + with caplog.at_level("ERROR", logger="src.modal.gateway.errors"): + message = errors_module.log_and_redact_exception(exc, scope="fallback") + + match = CORRELATION_RE.search(message) + assert match is not None, message + assert "secret-parameter-name" not in message + assert message.startswith("Simulation failed") + + # Exactly one guaranteed server-side record carrying the correlation id + # and the full exception (message + stack) for operators. + stdlib_records = [ + record for record in caplog.records if "fallback" in record.getMessage() + ] + assert len(stdlib_records) == 1 + record = stdlib_records[0] + assert match.group(1) in record.getMessage() + assert record.exc_info is not None + assert record.exc_info[1] is exc + +def test_log_and_redact_exception_survives_structured_sink_failure(monkeypatch, caplog): + def _raise(*args, **kwargs): + raise RuntimeError("observability failed") -def test_log_and_redact_exception_falls_back_to_stdlib_logger(monkeypatch, caplog): - monkeypatch.setattr(errors_module, "_logfire", None) + monkeypatch.setattr(errors_module, "record_error", _raise) + monkeypatch.setattr(errors_module, "logfire_is_configured", lambda: False) exc = ValueError("secret-parameter-name") with caplog.at_level("ERROR", logger="src.modal.gateway.errors"): message = errors_module.log_and_redact_exception(exc, scope="fallback") diff --git a/projects/policyengine-api-simulation/tests/test_budget_window_batch.py b/projects/policyengine-api-simulation/tests/test_budget_window_batch.py index 37214426f..5f3f082af 100644 --- a/projects/policyengine-api-simulation/tests/test_budget_window_batch.py +++ b/projects/policyengine-api-simulation/tests/test_budget_window_batch.py @@ -350,6 +350,72 @@ def test_scheduler_sleep_exponentially_backs_off_then_resets_on_progress( assert sleeps == [0.5, 1.0, 2.0, 4.0] +def test_scheduler_publishes_bounded_poll_aggregates(monkeypatch, mock_batch_modal): + """Poll/sleep telemetry must be published as overwriting aggregate + attributes, not one segment per probe: segments append nodes to the + operation's in-memory segment tree, so a near-timeout batch polling for + an hour would grow memory without bound and emit a final operation log + line large enough for Modal's log pipeline to truncate. + """ + + request, payload = _build_parent_payload(window_size=1) + _seed_parent_batch(request, mock_batch_modal["parent_call_id"]) + + tracker = SpawnTracker() + run_simulation = MockRunSimulationFunction( + tracker=tracker, + results_by_year={ + "2026": [ + TimeoutError(), + TimeoutError(), + { + "budget": { + "tax_revenue_impact": 10, + "state_tax_revenue_impact": 3, + "benefit_spending_impact": 5, + "budgetary_impact": 15, + } + }, + ], + }, + call_registry=mock_batch_modal["call_registry"], + ) + mock_batch_modal["functions"][ + ("policyengine-simulation-py4-10-0", "run_simulation") + ] = run_simulation + + monkeypatch.setattr(scheduler_module.time, "sleep", lambda _: None) + attributes: dict[str, object] = {} + monkeypatch.setattr( + scheduler_module, + "set_attribute", + lambda key, value: attributes.__setitem__(key, value), + ) + + runner = scheduler_module.BudgetWindowBatchRunner( + context=scheduler_module.BudgetWindowBatchContext( + batch_job_id=mock_batch_modal["parent_call_id"], + request=request, + resolved_version="1.500.0", + resolved_app_name="policyengine-simulation-py4-10-0", + bundle=PolicyEngineBundle(model_version="1.500.0"), + raw_params=payload, + ), + poll_interval_seconds=0.5, + poll_interval_max_seconds=4.0, + poll_interval_backoff_factor=2.0, + ) + + runner.run() + + # Two TimeoutError probes plus the resolving poll. + assert attributes["child_poll_count"] == 3 + assert attributes["child_poll_ms_total"] >= 0 + # Two empty polls: 0.5s + 1.0s of backoff sleep. + assert attributes["backoff_sleep_count"] == 2 + assert attributes["backoff_sleep_ms_total"] == 1500.0 + + def test_run_budget_window_batch_impl_fails_on_malformed_child_result( mock_batch_modal, ): diff --git a/projects/policyengine-api-simulation/tests/test_logfire_integration.py b/projects/policyengine-api-simulation/tests/test_logfire_integration.py deleted file mode 100644 index 23081f4a0..000000000 --- a/projects/policyengine-api-simulation/tests/test_logfire_integration.py +++ /dev/null @@ -1,219 +0,0 @@ -"""Tests for Logfire integration logic.""" - -from unittest.mock import MagicMock - -pytest_plugins = ("fixtures.test_logfire_integration",) - - -class TestConfigureLogfireLogic: - """Tests for the configure_logfire behavior.""" - - def test_configure_logfire_skips_without_token(self): - """ - Given no LOGFIRE_TOKEN environment variable - When configure_logfire logic runs - Then logfire.configure should not be called. - """ - # Given - mock_logfire = MagicMock() - env = {} - - # When - replicate the configure_logfire logic - token = env.get("LOGFIRE_TOKEN", "") - if token: - mock_logfire.configure( - service_name="test", - token=token, - environment=env.get("LOGFIRE_ENVIRONMENT", "production"), - console=False, - ) - - # Then - mock_logfire.configure.assert_not_called() - - def test_configure_logfire_calls_configure_with_token(self): - """ - Given a LOGFIRE_TOKEN environment variable - When configure_logfire logic runs - Then logfire.configure is called with correct parameters. - """ - # Given - mock_logfire = MagicMock() - env = { - "LOGFIRE_TOKEN": "test-token-123", - "LOGFIRE_ENVIRONMENT": "staging", - } - service_name = "test-service" - - # When - replicate the configure_logfire logic - token = env.get("LOGFIRE_TOKEN", "") - if token: - mock_logfire.configure( - service_name=service_name, - token=token, - environment=env.get("LOGFIRE_ENVIRONMENT", "production"), - console=False, - ) - - # Then - mock_logfire.configure.assert_called_once_with( - service_name="test-service", - token="test-token-123", - environment="staging", - console=False, - ) - - def test_configure_logfire_uses_default_environment(self): - """ - Given a LOGFIRE_TOKEN but no LOGFIRE_ENVIRONMENT - When configure_logfire logic runs - Then the default environment 'production' is used. - """ - # Given - mock_logfire = MagicMock() - env = { - "LOGFIRE_TOKEN": "test-token-456", - } - - # When - replicate the configure_logfire logic - token = env.get("LOGFIRE_TOKEN", "") - if token: - mock_logfire.configure( - service_name="policyengine-simulation", - token=token, - environment=env.get("LOGFIRE_ENVIRONMENT", "production"), - console=False, - ) - - # Then - mock_logfire.configure.assert_called_once() - call_kwargs = mock_logfire.configure.call_args[1] - assert call_kwargs["environment"] == "production" - - def test_configure_logfire_disables_console(self): - """ - Given any configuration - When configure_logfire logic runs - Then console output is disabled. - """ - # Given - mock_logfire = MagicMock() - env = {"LOGFIRE_TOKEN": "test-token"} - - # When - token = env.get("LOGFIRE_TOKEN", "") - if token: - mock_logfire.configure( - service_name="test", - token=token, - environment=env.get("LOGFIRE_ENVIRONMENT", "production"), - console=False, - ) - - # Then - call_kwargs = mock_logfire.configure.call_args[1] - assert call_kwargs["console"] is False - - -class TestLogfireSpanPattern: - """Tests for the Logfire span usage pattern in run_simulation.""" - - def test_span_receives_input_params(self, mock_logfire, mock_span): - """ - Given simulation parameters - When a span is created - Then input_params are passed as span attributes. - """ - # Given - params = {"country": "us", "reform": {"test": True}} - - # When - replicate the run_simulation span pattern - with mock_logfire.span("run_simulation", input_params=params): - pass - - # Then - mock_logfire.span.assert_called_once_with("run_simulation", input_params=params) - - def test_span_captures_output_result(self, mock_logfire, mock_span): - """ - Given a successful simulation - When the span completes - Then output_result attribute is set. - """ - # Given - params = {"country": "uk"} - result = {"budget": {"total": 1000000}} - - # When - replicate the run_simulation span pattern - with mock_logfire.span("run_simulation", input_params=params) as span: - span.set_attribute("output_result", result) - - # Then - mock_span.set_attribute.assert_called_once_with("output_result", result) - - def test_force_flush_called_after_span(self, mock_logfire, mock_span): - """ - Given any simulation execution - When run_simulation completes - Then logfire.force_flush is called. - """ - # Given - params = {} - - # When - replicate the run_simulation pattern - try: - with mock_logfire.span("run_simulation", input_params=params): - pass - finally: - mock_logfire.force_flush() - - # Then - mock_logfire.force_flush.assert_called_once() - - def test_force_flush_called_even_on_exception(self, mock_logfire, mock_span): - """ - Given a simulation that raises an exception - When run_simulation fails - Then logfire.force_flush is still called. - """ - # Given - params = {} - exception_raised = False - - # When - replicate the run_simulation pattern with exception - try: - with mock_logfire.span("run_simulation", input_params=params): - raise ValueError("Simulation failed") - except ValueError: - exception_raised = True - finally: - mock_logfire.force_flush() - - # Then - assert exception_raised - mock_logfire.force_flush.assert_called_once() - - def test_result_is_returned_from_span(self, mock_logfire, mock_span): - """ - Given a successful simulation - When run_simulation completes - Then the result is returned. - """ - # Given - params = {"country": "us"} - expected_result = {"budget": {"total": 500000}} - - # When - replicate the full pattern - def run_simulation_impl(p): - return expected_result - - result = None - try: - with mock_logfire.span("run_simulation", input_params=params) as span: - result = run_simulation_impl(params) - span.set_attribute("output_result", result) - finally: - mock_logfire.force_flush() - - # Then - assert result == expected_result diff --git a/projects/policyengine-api-simulation/tests/test_logfire_legacy.py b/projects/policyengine-api-simulation/tests/test_logfire_legacy.py new file mode 100644 index 000000000..5634c73a8 --- /dev/null +++ b/projects/policyengine-api-simulation/tests/test_logfire_legacy.py @@ -0,0 +1,123 @@ +"""Tests for legacy Logfire helper behavior.""" + +from __future__ import annotations + +import sys +from contextlib import nullcontext + +import pytest + +from src.modal import logfire_legacy + + +@pytest.fixture(autouse=True) +def _reset_logfire_configured_flag(monkeypatch): + """Isolate the module-level configured flag between tests.""" + monkeypatch.setattr(logfire_legacy, "_logfire_configured", False) + + +class _FakeLogfire: + def __init__(self) -> None: + self.configure_calls = [] + self.span_calls = [] + self.flush_calls = 0 + self.span_context = nullcontext("span") + + def configure(self, **kwargs): + self.configure_calls.append(kwargs) + + def span(self, name, **kwargs): + self.span_calls.append((name, kwargs)) + return self.span_context + + def force_flush(self): + self.flush_calls += 1 + + +def test_configure_logfire_skips_without_token(monkeypatch): + fake_logfire = _FakeLogfire() + monkeypatch.delenv("LOGFIRE_TOKEN", raising=False) + monkeypatch.setitem(sys.modules, "logfire", fake_logfire) + + assert logfire_legacy.configure_logfire("policyengine-simulation") is False + assert fake_logfire.configure_calls == [] + + +def test_configure_logfire_uses_token_and_environment(monkeypatch): + fake_logfire = _FakeLogfire() + monkeypatch.setenv("LOGFIRE_TOKEN", "token-123") + monkeypatch.setenv("LOGFIRE_ENVIRONMENT", "staging") + monkeypatch.setenv("MODAL_ENVIRONMENT", "main") + monkeypatch.setitem(sys.modules, "logfire", fake_logfire) + + assert logfire_legacy.configure_logfire("policyengine-simulation") is True + assert fake_logfire.configure_calls == [ + { + "service_name": "policyengine-simulation", + "token": "token-123", + "environment": "staging", + "console": False, + } + ] + + +def test_configure_logfire_falls_back_to_default_environment(monkeypatch): + fake_logfire = _FakeLogfire() + monkeypatch.setenv("LOGFIRE_TOKEN", "token-123") + monkeypatch.delenv("LOGFIRE_ENVIRONMENT", raising=False) + monkeypatch.setenv("MODAL_ENVIRONMENT", "main") + monkeypatch.setitem(sys.modules, "logfire", fake_logfire) + + logfire_legacy.configure_logfire( + "policyengine-simulation", + default_environment="development", + ) + + assert fake_logfire.configure_calls[0]["environment"] == "development" + + +def test_logfire_is_configured_tracks_configure_state(monkeypatch): + """logfire_is_configured must reflect whether a token-backed configure ran. + + Logfire's own ``config.send_to_logfire`` defaults to True on an + unconfigured instance, so this module flag is the only reliable signal. + """ + fake_logfire = _FakeLogfire() + monkeypatch.setitem(sys.modules, "logfire", fake_logfire) + + monkeypatch.delenv("LOGFIRE_TOKEN", raising=False) + assert logfire_legacy.configure_logfire("policyengine-simulation") is False + assert logfire_legacy.logfire_is_configured() is False + + monkeypatch.setenv("LOGFIRE_TOKEN", "token-123") + assert logfire_legacy.configure_logfire("policyengine-simulation") is True + assert logfire_legacy.logfire_is_configured() is True + + monkeypatch.delenv("LOGFIRE_TOKEN", raising=False) + assert logfire_legacy.configure_logfire("policyengine-simulation") is False + assert logfire_legacy.logfire_is_configured() is False + + +def test_logfire_span_noops_when_disabled(): + with logfire_legacy.logfire_span(False, "run_simulation") as span: + assert span is None + + +def test_logfire_span_delegates_when_enabled(monkeypatch): + fake_logfire = _FakeLogfire() + monkeypatch.setitem(sys.modules, "logfire", fake_logfire) + + with logfire_legacy.logfire_span(True, "run_simulation", country="us") as span: + assert span == "span" + + assert fake_logfire.span_calls == [("run_simulation", {"country": "us"})] + + +def test_flush_logfire_only_flushes_when_enabled(monkeypatch): + fake_logfire = _FakeLogfire() + monkeypatch.setitem(sys.modules, "logfire", fake_logfire) + + logfire_legacy.flush_logfire(False) + logfire_legacy.flush_logfire(True) + + assert fake_logfire.flush_calls == 1 diff --git a/projects/policyengine-api-simulation/tests/test_modal_bundle_image.py b/projects/policyengine-api-simulation/tests/test_modal_bundle_image.py index d6e03aafb..a41227f68 100644 --- a/projects/policyengine-api-simulation/tests/test_modal_bundle_image.py +++ b/projects/policyengine-api-simulation/tests/test_modal_bundle_image.py @@ -59,6 +59,7 @@ def install_fake_modal(monkeypatch): modal.Secret = FakeSecret modal.App = FakeApp modal.is_local = lambda: True + modal.asgi_app = lambda: lambda function: function monkeypatch.setitem(sys.modules, "modal", modal) @@ -87,6 +88,14 @@ def test_modal_image_uses_policyengine_bundle_install(monkeypatch): "/.policyengine-bundle-receipt.json" ) assert command_calls[0][2]["secrets"] == [app.data_secret, app.hf_secret] + pip_install_calls = [ + call for call in app.simulation_image.calls if call[0] == "pip_install" + ] + assert pip_install_calls + packages = pip_install_calls[0][1] + assert "policyengine-observability[fastapi]>=1.3.0,<2" in packages + assert "logfire>=3.0.0" in packages + runtime_secret_sets = { name: kwargs["secrets"] for name, kwargs in app.app.function_calls } @@ -140,3 +149,24 @@ def test_modal_image_prebuilds_datasets_between_env_and_local_source(monkeypatch if call[0] == "run_function" and call[1] == "snapshot_models" ) assert env_index < prebuild_indices[0] < local_source_index < snapshot_index + + +def test_gateway_image_installs_dual_observability(monkeypatch): + install_fake_modal(monkeypatch) + sys.modules.pop("src.modal.gateway.app", None) + + app = importlib.import_module("src.modal.gateway.app") + + pip_install_calls = [ + call for call in app.gateway_image.calls if call[0] == "pip_install" + ] + assert pip_install_calls + packages = pip_install_calls[0][1] + assert "policyengine-observability[fastapi]>=1.3.0,<2" in packages + assert "logfire>=3.0.0" in packages + + function_kwargs = {name: kwargs for name, kwargs in app.app.function_calls} + assert function_kwargs["web_app"]["secrets"] == [ + app.gateway_auth_secret, + app.logfire_secret, + ] diff --git a/projects/policyengine-api-simulation/tests/test_observability.py b/projects/policyengine-api-simulation/tests/test_observability.py new file mode 100644 index 000000000..ede9b16b8 --- /dev/null +++ b/projects/policyengine-api-simulation/tests/test_observability.py @@ -0,0 +1,190 @@ +import json +import os + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from policyengine_observability import ( + REQUEST_ID_HEADER, + ObservabilityRuntime, + operation, + set_observability_runtime, +) +from policyengine_observability.runtime import OPERATION_LOGGER, REQUEST_LOGGER + +from policyengine_api_simulation.observability import ( + LOG_DESTINATIONS, + SERVICE_NAME, + configure_process_observability, + init_process_observability, + init_simulation_observability, + process_static_attributes, +) + + +OBSERVABILITY_ENV_KEYS = ( + "OBSERVABILITY_PLATFORM", + "OBSERVABILITY_SERVICE_ROLE", + "OBSERVABILITY_RUNTIME_ROLE", + "OBSERVABILITY_MODAL_APP_NAME", + "OBSERVABILITY_MODAL_FUNCTION_NAME", + "OBSERVABILITY_LOG_DESTINATIONS", + "OTEL_ENABLED", + "GOOGLE_CLOUD_PROJECT", + "MODAL_ENVIRONMENT", +) + + +@pytest.fixture(autouse=True) +def reset_observability_runtime(): + saved_env = {key: os.environ.get(key) for key in OBSERVABILITY_ENV_KEYS} + for key in OBSERVABILITY_ENV_KEYS: + os.environ.pop(key, None) + yield + set_observability_runtime(ObservabilityRuntime.disabled()) + for key, value in saved_env.items(): + if value is None: + os.environ.pop(key, None) + else: + os.environ[key] = value + + +def test_configure_process_observability_sets_modal_metadata(): + configure_process_observability( + platform="modal", + service_role="simulation_worker", + runtime_role="worker", + modal_app_name="policyengine-simulation-py4-19-1", + modal_function_name="run_simulation", + ) + + assert os.environ["OBSERVABILITY_PLATFORM"] == "modal" + assert os.environ["OBSERVABILITY_SERVICE_ROLE"] == "simulation_worker" + assert os.environ["OBSERVABILITY_RUNTIME_ROLE"] == "worker" + assert ( + os.environ["OBSERVABILITY_MODAL_APP_NAME"] == "policyengine-simulation-py4-19-1" + ) + assert os.environ["OBSERVABILITY_MODAL_FUNCTION_NAME"] == "run_simulation" + + +def test_configure_process_observability_overwrites_stale_metadata(): + os.environ["OBSERVABILITY_SERVICE_ROLE"] = "old_role" + os.environ["OBSERVABILITY_MODAL_FUNCTION_NAME"] = "old_function" + + configure_process_observability( + platform="modal", + service_role="budget_window_worker", + modal_app_name="policyengine-simulation-py4-19-1", + modal_function_name="run_budget_window_batch", + ) + + assert os.environ["OBSERVABILITY_SERVICE_ROLE"] == "budget_window_worker" + assert os.environ["OBSERVABILITY_RUNTIME_ROLE"] == "budget_window_worker" + assert os.environ["OBSERVABILITY_MODAL_FUNCTION_NAME"] == "run_budget_window_batch" + + +def test_process_static_attributes_carries_modal_identity(): + os.environ["MODAL_ENVIRONMENT"] = "main" + configure_process_observability( + platform="modal", + service_role="simulation_worker", + modal_app_name="policyengine-simulation-py4-19-1", + modal_function_name="run_simulation", + ) + + attributes = process_static_attributes(service_role="simulation_worker") + + assert attributes["platform"] == "modal" + assert attributes["runtime_role"] == "simulation_worker" + assert attributes["modal_environment"] == "main" + assert attributes["modal_app_name"] == "policyengine-simulation-py4-19-1" + assert attributes["modal_function_name"] == "run_simulation" + assert attributes["logfire_status"] == "legacy_candidate_for_replacement" + + +def test_worker_operation_log_includes_modal_identity(monkeypatch): + """Replicates the Modal worker entrypoint sequence: the operation log + must carry the Modal identity attributes, which have no FastAPI-adapter + injection point in plain-process runtimes.""" + configure_process_observability( + platform="modal", + service_role="simulation_worker", + modal_app_name="policyengine-simulation-py4-19-1", + modal_function_name="run_simulation", + ) + init_process_observability(service_role="simulation_worker") + + records = [] + monkeypatch.setattr( + OPERATION_LOGGER, + "info", + lambda message: records.append(json.loads(message)), + ) + + static_attributes = process_static_attributes(service_role="simulation_worker") + with operation("run_simulation", flavor="modal_function", **static_attributes): + pass + + assert len(records) == 1 + record = records[0] + assert record["event"] == "operation_completed" + assert record["operation"] == "run_simulation" + assert record["service_role"] == "simulation_worker" + assert record["platform"] == "modal" + assert record["runtime_role"] == "simulation_worker" + assert record["modal_app_name"] == "policyengine-simulation-py4-19-1" + assert record["modal_function_name"] == "run_simulation" + + +def test_init_simulation_observability_forces_stdout_and_disables_exports(): + os.environ["OTEL_ENABLED"] = "true" + os.environ["OBSERVABILITY_LOG_DESTINATIONS"] = "google_cloud_logging" + os.environ["GOOGLE_CLOUD_PROJECT"] = "policyengine-prod" + os.environ["MODAL_ENVIRONMENT"] = "staging" + + app = FastAPI() + runtime = init_simulation_observability(app, service_role="modal_gateway") + + assert app.state.policyengine_observability is runtime + assert runtime.config.service_name == SERVICE_NAME + assert runtime.config.service_role == "modal_gateway" + assert runtime.config.environment == "staging" + assert runtime.config.log_destinations == LOG_DESTINATIONS + assert runtime.config.otel_enabled is False + assert runtime.config.google_cloud_project is None + + +def test_fastapi_observability_emits_structured_request_log(monkeypatch): + app = FastAPI() + + @app.get("/health") + def health(): + return {"status": "healthy"} + + init_simulation_observability(app, service_role="api") + + records = [] + monkeypatch.setattr( + REQUEST_LOGGER, + "info", + lambda message: records.append(json.loads(message)), + ) + + response = TestClient(app).get( + "/health", + headers={REQUEST_ID_HEADER: "request-123"}, + ) + + assert response.status_code == 200 + assert len(records) == 1 + record = records[0] + assert record["schema_version"] == "policyengine.observability.request.v1" + assert record["event"] == "http_request_completed" + assert record["service_name"] == SERVICE_NAME + assert record["service_role"] == "api" + assert record["request_id"] == "request-123" + assert record["method"] == "GET" + assert record["route"] == "/health" + assert record["status_code"] == 200 + assert record["logfire_status"] == "legacy_candidate_for_replacement" + assert record["logfire_replacement_candidate"] == "policyengine-observability" diff --git a/projects/policyengine-api-simulation/tests/test_simulation_output_builder.py b/projects/policyengine-api-simulation/tests/test_simulation_output_builder.py index 026481c8e..f34a9523b 100644 --- a/projects/policyengine-api-simulation/tests/test_simulation_output_builder.py +++ b/projects/policyengine-api-simulation/tests/test_simulation_output_builder.py @@ -7,6 +7,11 @@ import pandas as pd import pytest +from policyengine_observability import ( + ObservabilityConfig, + ObservabilityRuntime, + set_observability_runtime, +) from fixtures.test_simulation_api_contracts import ( CURRENT_SINGLE_YEAR_MACRO_KEYS, @@ -25,19 +30,28 @@ ) from policyengine_api_simulation.release_bundle import BUNDLE_RECEIPT_FILENAME from policyengine_api_simulation.release_bundle import get_country_release_bundle +from policyengine_api_simulation.observability import SegmentName from policyengine_api_simulation.simulation_runtime import RegionResolution from policyengine_api_simulation.simulation_runtime import _load_dataset from policyengine_api_simulation.simulation_runtime import _normalise_policy from policyengine_api_simulation.simulation_runtime import _resolve_dataset_reference from policyengine_api_simulation.simulation_runtime import _resolve_region from policyengine_api_simulation.simulation_runtime import _run_simulation_impl_core +from policyengine_api_simulation.simulation_runtime import run_simulation_impl from policyengine_api_simulation.simulation_macro_output import ( + AgePovertyOutput, + BaselineReformValue, BudgetaryImpact, BudgetaryOutput, DecileOutput, DetailedBudgetOutput, GeographicImpactOutput, + GenderPovertyOutput, + InequalityOutput, IntraDecileOutput, + LaborSupplyResponseOutput, + PovertyByGenderOutput, + PovertyModuleOutputs, PovertyOutput, SingleYearMacroOutput, ) @@ -59,6 +73,32 @@ def ensure(self): raise AssertionError("test data is already materialized") +def _with_observability_timings(callback): + runtime = ObservabilityRuntime( + ObservabilityConfig( + service_name="policyengine-api-simulation-test", + service_role="test", + environment="test", + otel_enabled=False, + ), + segment_registry=SegmentName, + ) + set_observability_runtime(runtime) + handle = runtime.start_operation("test_operation", flavor="unit") + try: + result = callback() + operation = handle["operation"] + return ( + result, + dict(operation.timings_ms), + dict(operation.timing_counts), + [node.as_dict() for node in operation.segment_tree], + ) + finally: + runtime.end_operation(handle) + set_observability_runtime(ObservabilityRuntime.disabled()) + + def _macro_baseline_reform(): baseline = _FakeSimulation( pd.DataFrame( @@ -218,6 +258,331 @@ def test_builder_returns_existing_single_year_macro_shape(monkeypatch): assert output == CURRENT_SINGLE_YEAR_MACRO_RESULT +def test_run_simulation_impl_records_runtime_timings_without_real_calculation( + monkeypatch, +): + dataset = object() + country_module = SimpleNamespace(model=SimpleNamespace(version="1.715.2")) + baseline_simulation = object() + reform_simulation = object() + build_calls = [] + + def fake_build_simulation(params, *, dataset, policy, scoping_strategy=None): + build_calls.append((params, dataset, policy, scoping_strategy)) + return baseline_simulation if len(build_calls) == 1 else reform_simulation + + class FakeSimulationOutputBuilder: + def __init__(self, **kwargs): + self.kwargs = kwargs + + def serialize(self): + return CURRENT_SINGLE_YEAR_MACRO_RESULT + + monkeypatch.delenv("GOOGLE_APPLICATION_CREDENTIALS", raising=False) + monkeypatch.delenv("GOOGLE_APPLICATION_CREDENTIALS_JSON", raising=False) + monkeypatch.delenv("GCP_CREDENTIALS_JSON", raising=False) + monkeypatch.delenv("GOOGLE_CREDENTIALS", raising=False) + monkeypatch.delenv("SERVICE_ACCOUNT_JSON", raising=False) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_runtime._country_module", + lambda country: country_module, + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_runtime._resolve_region", + lambda **kwargs: RegionResolution( + code="us", + dataset_reference="mock-dataset", + scoping_strategy="mock-scoping", + ), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_runtime._load_dataset", + lambda params, country_module, region_resolution: dataset, + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_runtime._build_simulation", + fake_build_simulation, + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_runtime.SimulationOutputBuilder", + FakeSimulationOutputBuilder, + ) + + result, timings, counts, segment_tree = _with_observability_timings( + lambda: run_simulation_impl( + { + "country": "us", + "baseline": {"gov.test.parameter": {"2026-01-01": 1}}, + "reform": {"gov.test.parameter": {"2026-01-01": 2}}, + } + ) + ) + + assert result == CURRENT_SINGLE_YEAR_MACRO_RESULT + assert set(timings) >= { + SegmentName.CREDENTIAL_SETUP, + SegmentName.REQUEST_PARSE, + SegmentName.COUNTRY_MODULE_LOAD, + SegmentName.REGION_RESOLUTION, + SegmentName.DATASET_LOAD, + SegmentName.POLICY_NORMALIZATION, + SegmentName.SIMULATION_BUILD, + } + assert counts[SegmentName.SIMULATION_BUILD] == 2 + assert [node["name"] for node in segment_tree] == [ + SegmentName.CREDENTIAL_SETUP, + SegmentName.REQUEST_PARSE, + SegmentName.COUNTRY_MODULE_LOAD, + SegmentName.REGION_RESOLUTION, + SegmentName.DATASET_LOAD, + SegmentName.POLICY_NORMALIZATION, + SegmentName.SIMULATION_BUILD, + SegmentName.SIMULATION_BUILD, + ] + simulation_builds = [ + node for node in segment_tree if node["name"] == SegmentName.SIMULATION_BUILD + ] + assert [node["attrs"] for node in simulation_builds] == [ + {"simulation_kind": "baseline"}, + {"simulation_kind": "reform"}, + ] + assert build_calls == [ + ( + { + "country": "us", + "baseline": {"gov.test.parameter": {"2026-01-01": 1}}, + "reform": {"gov.test.parameter": {"2026-01-01": 2}}, + }, + dataset, + {"gov.test.parameter": {"2026-01-01": 1}}, + "mock-scoping", + ), + ( + { + "country": "us", + "baseline": {"gov.test.parameter": {"2026-01-01": 1}}, + "reform": {"gov.test.parameter": {"2026-01-01": 2}}, + }, + dataset, + {"gov.test.parameter": {"2026-01-01": 2}}, + "mock-scoping", + ), + ] + + +def test_builder_records_output_timings_without_real_calculation(monkeypatch): + baseline = object() + reform = object() + analysis = SimpleNamespace( + wealth_decile_impacts=object(), + intra_wealth_decile_impacts=object(), + ) + analysis_calls = [] + + def economic_impact_analysis( + baseline_simulation, + reform_simulation, + *, + include_cliff_impacts=False, + ): + analysis_calls.append( + (baseline_simulation, reform_simulation, include_cliff_impacts) + ) + return analysis + + value = BaselineReformValue(baseline=0.0, reform=0.0) + age_poverty = AgePovertyOutput( + child=value, + adult=value, + senior=value, + all=value, + ) + gender_poverty = GenderPovertyOutput(male=value, female=value) + poverty_outputs = PovertyModuleOutputs( + poverty=PovertyOutput( + poverty=age_poverty, + deep_poverty=age_poverty, + ), + poverty_by_gender=PovertyByGenderOutput( + poverty=gender_poverty, + deep_poverty=gender_poverty, + ), + poverty_by_race=None, + ) + output_calls = [] + + def record(name, value): + def wrapper(*args, **kwargs): + output_calls.append(name) + return value + + return wrapper + + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_budget.build_budgetary_impact", + record( + "budgetary_impact", + BudgetaryImpact( + tax_revenue_impact=0.0, + state_tax_revenue_impact=0.0, + benefit_spending_impact=0.0, + budgetary_impact=0.0, + households=0.0, + baseline_net_income=0.0, + ), + ), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_budget.build_detailed_budget", + record("detailed_budget", DetailedBudgetOutput({})), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_distribution.build_decile", + record("decile", DecileOutput(average={}, relative={})), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_inequality.build_inequality", + record( + "inequality", + InequalityOutput( + gini=value, + top_10_pct_share=value, + top_1_pct_share=value, + ), + ), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_poverty.build_poverty_outputs", + record("poverty", poverty_outputs), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_distribution.build_intra_decile_output", + record("intra_decile", IntraDecileOutput(deciles={}, all={})), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_distribution.build_wealth_decile", + record("wealth_decile", None), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_distribution.build_intra_wealth_decile", + record("intra_wealth_decile", None), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_labor.build_labor_supply_response", + record("labor_supply", LaborSupplyResponseOutput({})), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_cliff.build_cliff_impact", + record("cliff", None), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_geographic.build_congressional_district_impact", + record("congressional_district", GeographicImpactOutput([])), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_geographic.build_uk_constituency_impact", + record("uk_constituency", None), + ) + monkeypatch.setattr( + "policyengine_api_simulation.simulation_output_geographic.build_uk_local_authority_impact", + record("uk_local_authority", None), + ) + + builder = SimulationOutputBuilder( + country="us", + simulation_params={"country": "us", "data_version": "mock-data-version"}, + country_module=SimpleNamespace( + model=SimpleNamespace(version="mock-model-version"), + economic_impact_analysis=economic_impact_analysis, + ), + dataset=SimpleNamespace(metadata={}), + baseline=baseline, + reform=reform, + ) + + result, timings, counts, segment_tree = _with_observability_timings( + builder.serialize + ) + + assert result["model_version"] == "mock-model-version" + assert result["data_version"] == "mock-data-version" + assert analysis_calls == [(baseline, reform, False)] + assert set(output_calls) == { + "budgetary_impact", + "detailed_budget", + "decile", + "inequality", + "poverty", + "intra_decile", + "wealth_decile", + "intra_wealth_decile", + "labor_supply", + "cliff", + "congressional_district", + "uk_constituency", + "uk_local_authority", + } + assert set(timings) >= { + SegmentName.CALCULATION, + SegmentName.SIMULATION_OUTPUT_BUILD, + SegmentName.ECONOMIC_IMPACT_ANALYSIS, + SegmentName.OUTPUT_MODEL_VERSION, + SegmentName.OUTPUT_DATA_VERSION, + SegmentName.OUTPUT_BUDGETARY_IMPACT, + SegmentName.OUTPUT_DETAILED_BUDGET, + SegmentName.OUTPUT_DECILE, + SegmentName.OUTPUT_INEQUALITY, + SegmentName.OUTPUT_POVERTY, + SegmentName.OUTPUT_INTRA_DECILE, + SegmentName.OUTPUT_WEALTH_DECILE, + SegmentName.OUTPUT_INTRA_WEALTH_DECILE, + SegmentName.OUTPUT_LABOR_SUPPLY, + SegmentName.OUTPUT_CONGRESSIONAL_DISTRICT, + SegmentName.OUTPUT_UK_CONSTITUENCY, + SegmentName.OUTPUT_UK_LOCAL_AUTHORITY, + SegmentName.OUTPUT_CLIFF, + SegmentName.RESPONSE_SERIALIZATION, + SegmentName.SIMULATION_OUTPUT_MODEL_DUMP, + } + assert counts[SegmentName.ECONOMIC_IMPACT_ANALYSIS] == 1 + assert counts[SegmentName.SIMULATION_OUTPUT_MODEL_DUMP] == 1 + assert [node["name"] for node in segment_tree] == [ + SegmentName.CALCULATION, + SegmentName.RESPONSE_SERIALIZATION, + ] + calculation_children = segment_tree[0]["children"] + assert [node["name"] for node in calculation_children] == [ + SegmentName.SIMULATION_OUTPUT_BUILD + ] + output_build_children = calculation_children[0]["children"] + assert {node["name"] for node in output_build_children} >= { + SegmentName.OUTPUT_BUDGETARY_IMPACT, + SegmentName.OUTPUT_DETAILED_BUDGET, + SegmentName.OUTPUT_DECILE, + SegmentName.OUTPUT_INEQUALITY, + SegmentName.OUTPUT_POVERTY, + SegmentName.OUTPUT_INTRA_DECILE, + SegmentName.OUTPUT_WEALTH_DECILE, + SegmentName.OUTPUT_INTRA_WEALTH_DECILE, + SegmentName.OUTPUT_LABOR_SUPPLY, + SegmentName.OUTPUT_CONGRESSIONAL_DISTRICT, + SegmentName.OUTPUT_UK_CONSTITUENCY, + SegmentName.OUTPUT_UK_LOCAL_AUTHORITY, + SegmentName.OUTPUT_CLIFF, + SegmentName.OUTPUT_MODEL_VERSION, + SegmentName.OUTPUT_DATA_VERSION, + } + poverty_node = next( + node + for node in output_build_children + if node["name"] == SegmentName.OUTPUT_POVERTY + ) + assert poverty_node["children"][0]["name"] == (SegmentName.ECONOMIC_IMPACT_ANALYSIS) + assert segment_tree[1]["children"][0]["name"] == ( + SegmentName.SIMULATION_OUTPUT_MODEL_DUMP + ) + + def test_builder_maps_uk_wealth_outputs_and_omits_us_only_race(monkeypatch): output = _build_schema_output(monkeypatch, country="uk").model_dump(mode="json") diff --git a/projects/policyengine-api-simulation/uv.lock b/projects/policyengine-api-simulation/uv.lock index c7b0334a8..fde93d9c1 100644 --- a/projects/policyengine-api-simulation/uv.lock +++ b/projects/policyengine-api-simulation/uv.lock @@ -1418,6 +1418,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/54/f4b3de49f8d7d3a78fd6e6e1a6fd27dd342eb4d82c088b9078c6a32c3808/opentelemetry_exporter_otlp_proto_common-1.30.0-py3-none-any.whl", hash = "sha256:5468007c81aa9c44dc961ab2cf368a29d3475977df83b4e30aeed42aa7bc3b38", size = 18747, upload-time = "2025-02-04T18:16:51.512Z" }, ] +[[package]] +name = "opentelemetry-exporter-otlp-proto-grpc" +version = "1.30.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "deprecated" }, + { name = "googleapis-common-protos" }, + { name = "grpcio" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/86/3e/c7246df92c25e6ce95c349ad21597b4471b01ec9471e95d5261f1629fe92/opentelemetry_exporter_otlp_proto_grpc-1.30.0.tar.gz", hash = "sha256:d0f10f0b9b9a383b7d04a144d01cb280e70362cccc613987e234183fd1f01177", size = 26256, upload-time = "2025-02-04T18:17:16.956Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5e/35/d9f63fd84c2ed8dbd407bcbb933db4ed6e1b08e7fbdaca080b9ac309b927/opentelemetry_exporter_otlp_proto_grpc-1.30.0-py3-none-any.whl", hash = "sha256:2906bcae3d80acc54fd1ffcb9e44d324e8631058b502ebe4643ca71d1ff30830", size = 18550, upload-time = "2025-02-04T18:16:52.532Z" }, +] + [[package]] name = "opentelemetry-exporter-otlp-proto-http" version = "1.30.0" @@ -1483,6 +1501,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/55/1c/ec2d816b78edf2404d7b3df6d09eefb690b70bfd191b7da06f76634f1bdc/opentelemetry_instrumentation_fastapi-0.51b0-py3-none-any.whl", hash = "sha256:10513bbc11a1188adb9c1d2c520695f7a8f2b5f4de14e8162098035901cd6493", size = 12117, upload-time = "2025-02-04T18:20:15.267Z" }, ] +[[package]] +name = "opentelemetry-instrumentation-httpx" +version = "0.51b0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b7/d5/4a3990c461ae7e55212115e0f8f3aa412b5ce6493579e85c292245ac69ea/opentelemetry_instrumentation_httpx-0.51b0.tar.gz", hash = "sha256:061d426a04bf5215a859fea46662e5074f920e5cbde7e6ad6825a0a1b595802c", size = 17700, upload-time = "2025-02-04T18:21:31.685Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c3/ba/23d4ab6402408c01f1c3f32e0c04ea6dae575bf19bcb9a0049c9e768c983/opentelemetry_instrumentation_httpx-0.51b0-py3-none-any.whl", hash = "sha256:2e3fdf755ba6ead6ab43031497c3d55d4c796d0368eccc0ce48d304b7ec6486a", size = 14109, upload-time = "2025-02-04T18:20:19.947Z" }, +] + [[package]] name = "opentelemetry-instrumentation-logging" version = "0.51b0" @@ -1760,6 +1794,28 @@ requires-dist = [ ] provides-extras = ["test", "build"] +[[package]] +name = "policyengine-observability" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-exporter-otlp-proto-http" }, + { name = "opentelemetry-instrumentation-fastapi" }, + { name = "opentelemetry-instrumentation-httpx" }, + { name = "opentelemetry-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/fc/2a/5e0e4c0f30b20a59ffff14364e15374f9eee494f658f41559aba97c152a2/policyengine_observability-1.3.0.tar.gz", hash = "sha256:4c2f3bb2ee59886aef6c90f1edb9c0b957ace75b1518616c0e61905b8be431e0", size = 107961, upload-time = "2026-07-01T21:10:53.084Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a3/a2/80e4bc1c4923e8f4c04483bfd22df2500b0d132c4ce1a6dec6749c94d947/policyengine_observability-1.3.0-py3-none-any.whl", hash = "sha256:7a9444e2cb8ddd7f6d54d601bd1e0a7651bd44783b78627b293a27e741f91930", size = 31087, upload-time = "2026-07-01T21:10:51.793Z" }, +] + +[package.optional-dependencies] +fastapi = [ + { name = "fastapi" }, +] + [[package]] name = "policyengine-simulation-api-project" version = "0.1.0" @@ -1772,6 +1828,7 @@ dependencies = [ { name = "policyengine" }, { name = "policyengine-core" }, { name = "policyengine-fastapi" }, + { name = "policyengine-observability", extra = ["fastapi"] }, { name = "policyengine-uk" }, { name = "policyengine-us" }, { name = "pydantic-settings" }, @@ -1801,6 +1858,7 @@ requires-dist = [ { name = "policyengine", specifier = "==4.18.9" }, { name = "policyengine-core", specifier = "==3.28.0" }, { name = "policyengine-fastapi", editable = "../../libs/policyengine-fastapi" }, + { name = "policyengine-observability", extras = ["fastapi"], specifier = ">=1.3.0,<2" }, { name = "policyengine-uk", specifier = "==2.89.2" }, { name = "policyengine-us", specifier = "==1.752.2" }, { name = "pydantic-settings", specifier = ">=2.7.1,<3.0.0" },