Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion projects/policyengine-api-simulation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,44 @@ Operational notes:
`run_function` call in `src/modal/app.py` for one deploy.
- Only pure default requests read the baked folder: any request naming an
explicit `data` dataset or pinning a `data_version` bypasses it (see
`_load_dataset` in `simulation_runtime.py`).
`_load_dataset` in `simulation_runtime.py`).

## Observability

The service currently runs two observability backends in parallel:

- `policyengine-observability` emits structured request, operation, error,
and runtime timing logs.
- Logfire remains enabled as the legacy platform for existing dashboards and
alerting while we evaluate replacing it with another observability platform.

New instrumentation should be added through `policyengine-observability`; the
Logfire path is retained for continuity during that evaluation.

For `policyengine-observability`, this service intentionally forces:

- `log_destinations=("stdout",)`
- `otel_enabled=False`
- `google_cloud_project=None`

Cloud Logging and OTel export are therefore disabled until the target GCP
project is ready. The package does not currently provide memory-usage
measurements, so memory is not emitted.

Modal captures container output and exposes it through the app logs UI and
CLI. Useful `policyengine-observability` checks after deploying:

```bash
modal app logs policyengine-simulation-gateway --tail 100
modal app logs policyengine-simulation-gateway --tail 100 --search policyengine.observability
modal app logs policyengine-simulation-py<version> --tail 100 --search run_simulation
modal app dashboard policyengine-simulation-gateway
```

If using Modal source filters, include both `stdout` and `stderr`. The
observability destination is named `stdout`, but its current Python logging
handler writes through the standard stream handler.

Logfire continues to use the `policyengine-logfire` Modal secret. Worker
functions and the gateway configure Logfire only when `LOGFIRE_TOKEN` is
present.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from fastapi import FastAPI
from fastapi.testclient import TestClient

from policyengine_api_simulation.observability import (
init_simulation_observability,
)
from src.modal.gateway.auth import require_auth
from src.modal.gateway.endpoints import router

Expand All @@ -21,6 +24,7 @@ def create_gateway_app(*, authenticate: bool = True) -> FastAPI:
description="Test instance for unit tests",
version="0.0.1",
)
init_simulation_observability(app, service_role="modal_gateway")
app.include_router(router)
if authenticate:
app.dependency_overrides[require_auth] = lambda: None
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions projects/policyengine-api-simulation/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"policyengine-us==1.752.2",
"tables>=3.10.2",
"modal>=0.73.0",
"policyengine-observability[fastapi]>=1.3.0,<2",
"logfire>=3.0.0",
]

Expand Down
125 changes: 85 additions & 40 deletions projects/policyengine-api-simulation/src/modal/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,22 @@
import modal
import os

from policyengine_observability import operation, set_attribute

from src.modal._image_setup import prebuild_country_datasets, snapshot_models
from src.modal.dependency_pins import project_dependency_pin
from src.modal.logfire_legacy import (
configure_logfire,
flush_logfire,
legacy_logfire_attributes,
logfire_span,
)
from src.modal.logging_redaction import redact_params_for_logging
from policyengine_api_simulation.observability import (
configure_process_observability,
init_process_observability,
process_static_attributes,
)
from policyengine_api_simulation.release_bundle import get_bundled_country_model_version


Expand Down Expand Up @@ -89,7 +102,7 @@ def get_app_name(policyengine_version: str) -> str:
gcp_secret = modal.Secret.from_name("gcp-credentials", environment_name="main")
data_secret = modal.Secret.from_name("policyengine-data-credentials")
hf_secret = modal.Secret.from_name("huggingface-token")
# Logfire secret is environment-specific
# Legacy Logfire export remains while we evaluate a replacement observability platform.
logfire_secret = modal.Secret.from_name("policyengine-logfire")


Expand Down Expand Up @@ -130,7 +143,8 @@ def build_base_simulation_image() -> modal.Image:
"uv",
"fastapi>=0.115.0",
"tables>=3.10.2",
"logfire",
"logfire>=3.0.0",
"policyengine-observability[fastapi]>=1.3.0,<2",
)
.run_commands(
bundle_install_command(POLICYENGINE_VERSION),
Expand Down Expand Up @@ -169,20 +183,28 @@ def build_base_simulation_image() -> modal.Image:
)


def configure_logfire(service_name: str = "policyengine-simulation"):
"""Configure Logfire for observability. Call at start of each function."""
import logfire
def _configure_modal_observability(
*,
service_role: str,
modal_function_name: str,
) -> dict:
configure_process_observability(
platform="modal",
service_role=service_role,
modal_app_name=APP_NAME,
modal_function_name=modal_function_name,
)
init_process_observability(service_role=service_role)
# Worker operations have no FastAPI adapter to inject static identity
# attributes, so the caller must merge these into its operation attrs.
return process_static_attributes(service_role=service_role)

token = os.environ.get("LOGFIRE_TOKEN", "")
if not token:
return

logfire.configure(
service_name=service_name,
token=token,
environment=os.environ.get("LOGFIRE_ENVIRONMENT", "production"),
console=False,
)
def _set_modal_call_attributes() -> None:
try:
set_attribute("function_call_id", modal.current_function_call_id())
except Exception:
pass


@app.function(
Expand All @@ -199,28 +221,37 @@ def run_simulation(params: dict) -> dict:
Execute economic simulation.

Imports the snapshotted implementation at runtime.
Logs input params and output result to Logfire for observability.
Emits redacted operation data to both observability systems.
"""
import logfire

from policyengine_api_simulation.simulation_runtime import run_simulation_impl

configure_logfire()
static_attributes = _configure_modal_observability(
service_role="simulation_worker",
modal_function_name="run_simulation",
)

# We deliberately avoid sending full ``params`` or ``result`` blobs to
# Logfire: both can embed signed URLs, reform parameter trees with
# sensitive policy details, or result payloads large enough to blow the
# span attribute size budget. The redacted summary keeps correlation
# traceability via run_id while leaving the heavy payload in memory.
redacted_params = redact_params_for_logging(params)
# either observability system: both can embed signed URLs, reform
# parameter trees with sensitive policy details, or result payloads
# large enough to blow attribute budgets. The redacted summary keeps
# correlation traceability via run_id while leaving the heavy payload
# in memory.
redacted_params = {
**redact_params_for_logging(params),
**static_attributes,
**legacy_logfire_attributes(),
}
logfire_enabled = False
try:
with logfire.span(
"run_simulation",
**redacted_params,
):
return run_simulation_impl(params)
with operation("run_simulation", flavor="modal_function", **redacted_params):
logfire_enabled = configure_logfire("policyengine-simulation")
_set_modal_call_attributes()
with logfire_span(logfire_enabled, "run_simulation", **redacted_params):
from policyengine_api_simulation.simulation_runtime import (
run_simulation_impl,
)

return run_simulation_impl(params)
finally:
logfire.force_flush()
flush_logfire(logfire_enabled)


@app.function(
Expand All @@ -234,18 +265,32 @@ def run_simulation(params: dict) -> dict:
)
def run_budget_window_batch(params: dict) -> dict:
"""Execute a multi-year budget-window batch orchestration."""
import logfire

from src.modal.budget_window_batch import run_budget_window_batch_impl

configure_logfire()
static_attributes = _configure_modal_observability(
service_role="budget_window_worker",
modal_function_name="run_budget_window_batch",
)

redacted_params = redact_params_for_logging(params)
redacted_params = {
**redact_params_for_logging(params),
**static_attributes,
**legacy_logfire_attributes(),
}
logfire_enabled = False
try:
with logfire.span(
with operation(
"run_budget_window_batch",
flavor="modal_function",
**redacted_params,
):
return run_budget_window_batch_impl(params)
logfire_enabled = configure_logfire("policyengine-simulation")
_set_modal_call_attributes()
with logfire_span(
logfire_enabled,
"run_budget_window_batch",
**redacted_params,
):
from src.modal.budget_window_batch import run_budget_window_batch_impl

return run_budget_window_batch_impl(params)
finally:
logfire.force_flush()
flush_logfire(logfire_enabled)
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
from typing import Any

import modal
from policyengine_observability import segment, set_attribute

from src.modal.budget_window_context import build_batch_context
from src.modal.budget_window_scheduler import BudgetWindowBatchRunner
from policyengine_api_simulation.observability import SegmentName


def run_budget_window_batch_impl(params: dict[str, Any]) -> dict[str, Any]:
context = build_batch_context(
params,
batch_job_id=modal.current_function_call_id(),
)
batch_job_id = modal.current_function_call_id()
set_attribute("batch_job_id", batch_job_id)
with segment(SegmentName.BUDGET_WINDOW_CONTEXT):
context = build_batch_context(
params,
batch_job_id=batch_job_id,
)
runner = BudgetWindowBatchRunner(context)
return runner.run()
Loading
Loading