Skip to content
Draft
3 changes: 3 additions & 0 deletions changelog.d/749.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Introduced typed local H5 request construction with `AreaBuildRequest`,
`AreaFilter`, and `USAreaCatalog`, while keeping the worker's legacy
`--work-items` path available for backward compatibility.
131 changes: 107 additions & 24 deletions modal_app/local_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

from modal_app.images import cpu_image as image # noqa: E402
from modal_app.resilience import reconcile_run_dir_fingerprint # noqa: E402
from policyengine_us_data.calibration.local_h5.fingerprinting import ( # noqa: E402
FingerprintingService,
PublishingInputBundle,
)
from policyengine_us_data.calibration.local_h5.partitioning import ( # noqa: E402
partition_weighted_work_items,
)
Expand Down Expand Up @@ -306,6 +310,65 @@ def get_version() -> str:
return pyproject["project"]["version"]


def _build_publishing_input_bundle(
*,
weights_path: Path,
dataset_path: Path,
db_path: Path | None,
geography_path: Path | None,
calibration_package_path: Path | None,
run_config_path: Path | None,
run_id: str,
version: str,
n_clones: int | None,
seed: int,
legacy_blocks_path: Path | None = None,
) -> PublishingInputBundle:
"""Build the normalized coordinator input bundle for one publish scope."""

return PublishingInputBundle(
weights_path=weights_path,
source_dataset_path=dataset_path,
target_db_path=db_path,
exact_geography_path=geography_path,
calibration_package_path=calibration_package_path,
run_config_path=run_config_path,
run_id=run_id,
version=version,
n_clones=n_clones,
seed=seed,
legacy_blocks_path=legacy_blocks_path,
)


def _resolve_scope_fingerprint(
*,
inputs: PublishingInputBundle,
scope: str,
expected_fingerprint: str = "",
) -> str:
"""Compute the scope fingerprint while preserving pinned resume values."""

service = FingerprintingService()
traceability = service.build_traceability(inputs=inputs, scope=scope)
computed_fingerprint = service.compute_scope_fingerprint(traceability)
if expected_fingerprint:
if expected_fingerprint != computed_fingerprint:
print(
"WARNING: Pinned fingerprint differs from current "
f"{scope} scope fingerprint. "
"Preserving pinned value for backward-compatible resume.\n"
f" Pinned: {expected_fingerprint}\n"
f" Current: {computed_fingerprint}"
)
else:
print(
f"Using pinned fingerprint from pipeline: {expected_fingerprint}"
)
return expected_fingerprint
return computed_fingerprint


def partition_work(
work_items: List[Dict],
num_workers: int,
Expand Down Expand Up @@ -553,6 +616,12 @@ def validate_staging(branch: str, run_id: str, version: str = "") -> Dict:
if not version:
version = run_id.split("_", 1)[0]

# PR 9 migration note:
# The coordinator still enumerates states, districts, and cities inline
# and emits legacy work_items. This is intentionally temporary for the
# dual-path migration. The target cleanup is to delegate regional request
# enumeration to USAreaCatalog and send typed --requests-json payloads to
# workers so area construction no longer lives in the coordinator.
result = subprocess.run(
[
"uv",
Expand Down Expand Up @@ -783,6 +852,7 @@ def coordinate_publish(
db_path = artifacts / "policy_data.db"
dataset_path = artifacts / "source_imputed_stratified_extended_cps.h5"
config_json_path = artifacts / "unified_run_config.json"
calibration_package_path = artifacts / "calibration_package.pkl"

required = {
"weights": weights_path,
Expand Down Expand Up @@ -824,30 +894,26 @@ def coordinate_publish(
validate = False

# Fingerprint-based cache invalidation
if expected_fingerprint:
fingerprint = expected_fingerprint
print(f"Using pinned fingerprint from pipeline: {fingerprint}")
else:
fp_result = subprocess.run(
[
"uv",
"run",
"python",
"-c",
f"""
from policyengine_us_data.calibration.publish_local_area import (
compute_input_fingerprint,
)
print(compute_input_fingerprint("{weights_path}", "{dataset_path}", {n_clones}, seed=42))
""",
],
capture_output=True,
text=True,
env=os.environ.copy(),
)
if fp_result.returncode != 0:
raise RuntimeError(f"Failed to compute fingerprint: {fp_result.stderr}")
fingerprint = fp_result.stdout.strip()
fingerprint_inputs = _build_publishing_input_bundle(
weights_path=weights_path,
dataset_path=dataset_path,
db_path=db_path,
geography_path=geography_path,
calibration_package_path=(
calibration_package_path if calibration_package_path.exists() else None
),
run_config_path=config_json_path if config_json_path.exists() else None,
run_id=run_id,
version=version,
n_clones=n_clones,
seed=42,
legacy_blocks_path=artifacts / "stacked_blocks.npy",
)
fingerprint = _resolve_scope_fingerprint(
inputs=fingerprint_inputs,
scope="regional",
expected_fingerprint=expected_fingerprint,
)
reconcile_action = reconcile_run_dir_fingerprint(run_dir, fingerprint)
if reconcile_action == "resume":
print(f"Inputs unchanged ({fingerprint}), resuming...")
Expand Down Expand Up @@ -1099,6 +1165,22 @@ def coordinate_national_publish(
"geography_assignment.npz": "national_geography_assignment.npz",
},
)
fingerprint_inputs = _build_publishing_input_bundle(
weights_path=weights_path,
dataset_path=dataset_path,
db_path=db_path,
geography_path=geography_path,
calibration_package_path=None,
run_config_path=config_json_path if config_json_path.exists() else None,
run_id=run_id,
version=version,
n_clones=n_clones,
seed=42,
)
fingerprint = _resolve_scope_fingerprint(
inputs=fingerprint_inputs,
scope="national",
)
run_dir = staging_dir / run_id
run_dir.mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -1206,6 +1288,7 @@ def coordinate_national_publish(
f"{version}. Run main_national_promote to publish."
),
"run_id": run_id,
"fingerprint": fingerprint,
"national_validation": national_validation_output,
}

Expand Down
20 changes: 18 additions & 2 deletions modal_app/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,25 @@ class RunMetadata:
error: Optional[str] = None
resume_history: list = field(default_factory=list)
fingerprint: Optional[str] = None
regional_fingerprint: Optional[str] = None

def __post_init__(self) -> None:
if self.regional_fingerprint is None and self.fingerprint is not None:
self.regional_fingerprint = self.fingerprint
if self.fingerprint is None and self.regional_fingerprint is not None:
self.fingerprint = self.regional_fingerprint

def to_dict(self) -> dict:
return asdict(self)
data = asdict(self)
if data.get("fingerprint") is None and data.get("regional_fingerprint") is not None:
data["fingerprint"] = data["regional_fingerprint"]
return data

@classmethod
def from_dict(cls, data: dict) -> "RunMetadata":
data = dict(data)
if data.get("regional_fingerprint") is None and data.get("fingerprint") is not None:
data["regional_fingerprint"] = data["fingerprint"]
return cls(**data)


Expand Down Expand Up @@ -912,7 +925,9 @@ def run_pipeline(
n_clones=n_clones,
validate=True,
run_id=run_id,
expected_fingerprint=meta.fingerprint or "",
expected_fingerprint=(
meta.regional_fingerprint or meta.fingerprint or ""
),
)
print(f" → coordinate_publish fc: {regional_h5_handle.object_id}")

Expand Down Expand Up @@ -948,6 +963,7 @@ def run_pipeline(
if isinstance(regional_h5_result, dict) and regional_h5_result.get(
"fingerprint"
):
meta.regional_fingerprint = regional_h5_result["fingerprint"]
meta.fingerprint = regional_h5_result["fingerprint"]
write_run_meta(meta, pipeline_volume)

Expand Down
Loading