Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ node_modules
!age_state.csv
!agi_state.csv
!soi_targets.csv
!policyengine_us_data/storage/calibration_targets/aca_marketplace_state_metal_selection_2024.csv
!policyengine_us_data/storage/social_security_aux.csv
!policyengine_us_data/storage/SSPopJul_TR2024.csv
!policyengine_us_data/storage/national_and_district_rents_2023.csv
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ database:
python policyengine_us_data/db/etl_state_income_tax.py --year $(YEAR)
python policyengine_us_data/db/etl_irs_soi.py --year $(YEAR)
python policyengine_us_data/db/etl_aca_agi_state_targets.py --year $(YEAR)
python policyengine_us_data/db/etl_aca_marketplace.py --year $(YEAR)
python policyengine_us_data/db/etl_pregnancy.py --year $(YEAR)
python policyengine_us_data/db/validate_database.py

Expand Down
2 changes: 2 additions & 0 deletions changelog.d/618.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add an ACA marketplace ETL that loads state-level HC.gov bronze-plan
selection targets for APTC recipients into the calibration database.
8 changes: 8 additions & 0 deletions policyengine_us_data/calibration/target_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ include:
geo_level: state
domain_variable: tanf

# === STATE — ACA marketplace APTC and bronze-plan enrollment counts ===
- variable: tax_unit_count
geo_level: state
domain_variable: used_aca_ptc
- variable: tax_unit_count
geo_level: state
domain_variable: selected_marketplace_plan_benchmark_ratio,used_aca_ptc

# === STATE — fine AGI bracket targets (stubs 9/10 from in55cmcsv) ===
- variable: person_count
geo_level: state
Expand Down
24 changes: 18 additions & 6 deletions policyengine_us_data/db/create_database_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,24 @@ def validate_parent_child_constraints(mapper, connection, target: Stratum):
THEN sc.value END),
'US'
) AS geographic_id,
GROUP_CONCAT(DISTINCT CASE
WHEN sc.constraint_variable NOT IN (
'state_fips', 'congressional_district_geoid',
'tax_unit_is_filer', 'ucgid_str'
) THEN sc.constraint_variable
END) AS domain_variable
-- Compute domain_variable via a correlated subquery so we can sort
-- the distinct constraint names alphabetically before concatenation.
-- We can't use `GROUP_CONCAT(DISTINCT ... ORDER BY ...)` because the
-- `ORDER BY` form inside aggregates requires SQLite >= 3.44, and the
-- Modal runner ships an older libsqlite.
(
SELECT GROUP_CONCAT(cv, ',')
FROM (
SELECT DISTINCT sc2.constraint_variable AS cv
FROM stratum_constraints sc2
WHERE sc2.stratum_id = t.stratum_id
AND sc2.constraint_variable NOT IN (
'state_fips', 'congressional_district_geoid',
'tax_unit_is_filer', 'ucgid_str'
)
ORDER BY sc2.constraint_variable
)
) AS domain_variable
FROM targets t
LEFT JOIN stratum_constraints sc ON t.stratum_id = sc.stratum_id
GROUP BY t.target_id, t.stratum_id, t.variable,
Expand Down
1 change: 1 addition & 0 deletions policyengine_us_data/db/create_field_valid_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def populate_field_valid_values(session: Session) -> None:
("source", "Census ACS S0101", "survey"),
("source", "IRS SOI", "administrative"),
("source", "CMS Marketplace", "administrative"),
("source", "CMS 2024 OEP state metal status PUF", "administrative"),
("source", "CMS Medicaid", "administrative"),
("source", "Census ACS S2704", "survey"),
("source", "USDA FNS SNAP", "administrative"),
Expand Down
236 changes: 236 additions & 0 deletions policyengine_us_data/db/etl_aca_marketplace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from __future__ import annotations

import logging
from pathlib import Path

import pandas as pd
from sqlmodel import Session, create_engine

from policyengine_us_data.calibration.calibration_utils import STATE_CODES
from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Target,
)
from policyengine_us_data.storage import CALIBRATION_FOLDER, STORAGE_FOLDER
from policyengine_us_data.utils.db import etl_argparser, get_geographic_strata

logger = logging.getLogger(__name__)

# `selected_marketplace_plan_benchmark_ratio == 1.0` represents benchmark
# silver coverage, so bronze plan selections are the subset below this ratio.
BENCHMARK_SILVER_RATIO = 1.0
Comment thread
baogorek marked this conversation as resolved.

STATE_METAL_SELECTION_PATH = (
Comment thread
baogorek marked this conversation as resolved.
CALIBRATION_FOLDER / "aca_marketplace_state_metal_selection_2024.csv"
)

STATE_ABBR_TO_FIPS = {abbr: fips for fips, abbr in STATE_CODES.items()}


def _extra_args(parser) -> None:
parser.add_argument(
"--state-metal-csv",
type=Path,
default=STATE_METAL_SELECTION_PATH,
help=("State-metal CMS OEP proxy CSV. Default: %(default)s"),
)


def extract_aca_marketplace_state_metal_data(
state_metal_csv_path: Path,
) -> pd.DataFrame:
"""Extract CMS marketplace state metal-status inputs from the checked-in CSV.

This ETL keeps an explicit extract step even though the source file already
lives in the repository. The original CMS 2024 OEP state metal status PUF
is not currently pulled from a stable direct-download endpoint in CI, so we
store the normalized input CSV at
`policyengine_us_data/storage/calibration_targets/aca_marketplace_state_metal_selection_2024.csv`.

Source (CMS Marketplace Open Enrollment Period Public Use Files):
https://www.cms.gov/marketplace/resources/data/public-use-files

To reproduce or update that file:
1. Download the CMS 2024 OEP State, Metal Level, and Enrollment Status PUF
from the URL above.
2. Preserve one row per state/platform/metal/enrollment-status combination.
3. Keep the `state_code`, `platform`, `metal_level`,
`enrollment_status`, `consumers`, and `aptc_consumers` columns.
4. Save the normalized output back to `state_metal_csv_path`.
"""
return pd.read_csv(state_metal_csv_path)


def build_state_marketplace_bronze_aptc_targets(
state_metal_df: pd.DataFrame,
) -> pd.DataFrame:
"""
Build HC.gov state bronze-selection targets among APTC consumers.

The 2024 CMS state-metal-status PUF exposes:
- metal rows (`B`, `G`, `S`) with enrollment_status=`All`
- aggregate rows (`All`) broken out by enrollment status (`01-atv`, etc.)

We use:
- total APTC consumers = sum of `aptc_consumers` for `metal_level == All`
across enrollment statuses
- bronze APTC consumers = `aptc_consumers` on the bronze row
"""
df = state_metal_df.copy()
df = df[df["platform"] == "HC.gov"].copy()

total_rows = df[
(df["metal_level"] == "All") & (df["aptc_consumers"].notna())
].copy()
bronze_rows = df[
(df["metal_level"] == "B")
& (df["enrollment_status"] == "All")
& (df["aptc_consumers"].notna())
].copy()

total_aptc = total_rows.groupby("state_code", as_index=False).agg(
marketplace_aptc_consumers=("aptc_consumers", "sum"),
marketplace_consumers=("consumers", "sum"),
)
bronze_aptc = bronze_rows[["state_code", "aptc_consumers", "consumers"]].rename(
columns={
"aptc_consumers": "bronze_aptc_consumers",
"consumers": "bronze_consumers",
}
)

result = total_aptc.merge(bronze_aptc, on="state_code", how="inner")
result["state_fips"] = result["state_code"].map(STATE_ABBR_TO_FIPS)
result = result[result["state_fips"].notna()].copy()
result["state_fips"] = result["state_fips"].astype(int)
invalid_bronze = (
result["bronze_aptc_consumers"] > result["marketplace_aptc_consumers"]
)
if invalid_bronze.any():
bad_states = result.loc[invalid_bronze, "state_code"].tolist()
raise ValueError(
"Bronze APTC consumers exceed total APTC consumers for states: "
f"{bad_states}. Source CSV likely corrupted."
)
result["bronze_aptc_share"] = (
result["bronze_aptc_consumers"] / result["marketplace_aptc_consumers"]
)
result.insert(0, "year", 2024)
result.insert(1, "source", "cms_2024_oep_state_metal_status_puf")
return result.sort_values("state_code").reset_index(drop=True)


def load_state_marketplace_bronze_aptc_targets(
targets_df: pd.DataFrame,
year: int,
) -> None:
db_url = f"sqlite:///{STORAGE_FOLDER / 'calibration' / 'policy_data.db'}"
engine = create_engine(db_url)

with Session(engine) as session:
geo_strata = get_geographic_strata(session)

for row in targets_df.itertuples(index=False):
state_fips = int(row.state_fips)
parent_id = geo_strata["state"].get(state_fips)
if parent_id is None:
logger.warning(
"No state geographic stratum for FIPS %s, skipping", state_fips
)
continue

# We intentionally do not subset to `tax_unit_is_filer == 1`.
# These CMS targets describe marketplace coverage groups rather
# than the IRS filer universe, so the closest calibration entity is
# a tax unit with positive modeled APTC use.
aptc_stratum = Stratum(
parent_stratum_id=parent_id,
notes=f"State FIPS {state_fips} Marketplace APTC recipients",
)
aptc_stratum.constraints_rel = [
StratumConstraint(
constraint_variable="state_fips",
operation="==",
value=str(state_fips),
),
StratumConstraint(
constraint_variable="used_aca_ptc",
operation=">",
value="0",
),
Comment thread
baogorek marked this conversation as resolved.
]
aptc_stratum.targets_rel.append(
Target(
# We use `tax_unit_count` rather than household/person
# counts because insurance groups map most closely to
# PolicyEngine tax units in the current calibration schema.
variable="tax_unit_count",
Comment thread
baogorek marked this conversation as resolved.
period=year,
value=float(row.marketplace_aptc_consumers),
active=True,
source="CMS 2024 OEP state metal status PUF",
notes="HC.gov APTC consumers across all enrollment statuses",
)
)
session.add(aptc_stratum)
session.flush()

bronze_stratum = Stratum(
parent_stratum_id=aptc_stratum.stratum_id,
notes=f"State FIPS {state_fips} Marketplace bronze APTC recipients",
)
bronze_stratum.constraints_rel = [
StratumConstraint(
constraint_variable="state_fips",
operation="==",
value=str(state_fips),
),
StratumConstraint(
constraint_variable="selected_marketplace_plan_benchmark_ratio",
operation="<",
value=str(BENCHMARK_SILVER_RATIO),
),
StratumConstraint(
constraint_variable="used_aca_ptc",
operation=">",
value="0",
),
]
bronze_stratum.targets_rel.append(
Target(
variable="tax_unit_count",
period=year,
value=float(row.bronze_aptc_consumers),
active=True,
source="CMS 2024 OEP state metal status PUF",
notes="HC.gov bronze plan selections among APTC consumers",
)
)
session.add(bronze_stratum)
session.flush()

session.commit()


def main() -> None:
args, year = etl_argparser(
"ETL for ACA marketplace bronze-selection calibration targets",
extra_args_fn=_extra_args,
)

state_metal = extract_aca_marketplace_state_metal_data(args.state_metal_csv)
targets_df = build_state_marketplace_bronze_aptc_targets(state_metal)
if targets_df.empty:
raise RuntimeError("No HC.gov marketplace bronze/APTC targets were generated.")

print(
"Loading ACA marketplace bronze/APTC state targets for "
f"{len(targets_df)} states from {args.state_metal_csv}"
)
load_state_marketplace_bronze_aptc_targets(targets_df, year)
print("ACA marketplace bronze/APTC targets loaded.")


if __name__ == "__main__":
main()
Loading
Loading