Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3410.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added baseline Grafana-compatible OTLP observability and stage timing telemetry for simulation submission flows.
80 changes: 67 additions & 13 deletions policyengine_api/libs/simulation_api_modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@

import os
from dataclasses import dataclass
from time import perf_counter
from typing import Optional

import httpx

from policyengine_api.gcp_logging import logger
from policyengine_api.observability import (
build_span_attributes,
get_observability,
observe_stage,
)


@dataclass
Expand Down Expand Up @@ -48,6 +54,7 @@ def __init__(self):
"https://policyengine--policyengine-simulation-gateway-web-app.modal.run",
)
self.client = httpx.Client(timeout=30.0)
self.observability = get_observability("policyengine-api-modal-client")

def run(self, payload: dict) -> ModalSimulationExecution:
"""
Expand All @@ -69,6 +76,8 @@ def run(self, payload: dict) -> ModalSimulationExecution:
httpx.HTTPStatusError
If the API returns an error response.
"""
start = perf_counter()
telemetry = dict((payload or {}).get("_telemetry") or {})
try:
# Map field names from SimulationOptions to Modal API format
# SimulationOptions uses 'model_version', Modal expects 'version'
Expand All @@ -78,19 +87,43 @@ def run(self, payload: dict) -> ModalSimulationExecution:
# Remove data_version as Modal doesn't use it
modal_payload.pop("data_version", None)

response = self.client.post(
f"{self.base_url}/simulate/economy/comparison",
json=modal_payload,
)
response.raise_for_status()
data = response.json()

with observe_stage(
self.observability,
stage="job.submitted",
service="policyengine-api-modal-client",
telemetry=telemetry,
success_status="submitted",
record_failure_counter=True,
timer=perf_counter,
details={"base_url": self.base_url},
) as stage_observation:
with self.observability.span(
"modal.submit_simulation",
build_span_attributes(
telemetry,
service="policyengine-api-modal-client",
),
) as span:
response = self.client.post(
f"{self.base_url}/simulate/economy/comparison",
json=modal_payload,
)
span.set_attribute("http.status_code", response.status_code)
response.raise_for_status()
data = response.json()
stage_observation.details.update(
{
"http_status_code": response.status_code,
"job_id": data.get("job_id"),
}
)
logger.log_struct(
{
"message": "Modal simulation job submitted",
"job_id": data.get("job_id"),
"run_id": data.get("run_id"),
"status": data.get("status"),
"duration_seconds": perf_counter() - start,
},
severity="INFO",
)
Expand All @@ -107,7 +140,7 @@ def run(self, payload: dict) -> ModalSimulationExecution:
logger.log_struct(
{
"message": f"Modal API HTTP error: {e.response.status_code}",
"run_id": (payload.get("_telemetry") or {}).get("run_id"),
"run_id": telemetry.get("run_id"),
"response_text": e.response.text[:500],
},
severity="ERROR",
Expand All @@ -118,7 +151,7 @@ def run(self, payload: dict) -> ModalSimulationExecution:
logger.log_struct(
{
"message": f"Modal API request error: {str(e)}",
"run_id": (payload.get("_telemetry") or {}).get("run_id"),
"run_id": telemetry.get("run_id"),
},
severity="ERROR",
)
Expand Down Expand Up @@ -170,11 +203,32 @@ def get_execution_by_id(self, job_id: str) -> ModalSimulationExecution:
ModalSimulationExecution
Execution object with current status and result if complete.
"""
start = perf_counter()
try:
response = self.client.get(f"{self.base_url}/jobs/{job_id}")
# Note: Modal returns 202 for running, 200 for complete, 500 for failed
# We handle all cases by checking the status field in the response
data = response.json()
with self.observability.span(
"modal.poll_job_status",
build_span_attributes(
None,
service="policyengine-api-modal-client",
job_id=job_id,
),
) as span:
response = self.client.get(f"{self.base_url}/jobs/{job_id}")
span.set_attribute("http.status_code", response.status_code)
# Note: Modal returns 202 for running, 200 for complete, 500 for failed
# We handle all cases by checking the status field in the response
data = response.json()

logger.log_struct(
{
"message": "Modal simulation job polled",
"job_id": job_id,
"run_id": data.get("run_id"),
"status": data.get("status"),
"duration_seconds": perf_counter() - start,
},
severity="INFO",
)

return ModalSimulationExecution(
job_id=job_id,
Expand Down
Loading
Loading