From 40559a35d79fabcf95f9e26e5a04d1f1d531e3ab Mon Sep 17 00:00:00 2001 From: elmartinj Date: Wed, 8 Apr 2026 17:22:26 -0600 Subject: [PATCH 1/4] Add initial CENACE pipeline integration --- src/cenace_pipeline.py | 69 +++++++++++++++ src/data/cenace/__init__.py | 0 src/data/cenace/aggregate/core.py | 48 +++++++++++ src/data/cenace/config.py | 15 ++++ src/data/cenace/transform/core.py | 0 src/data/cenace/utils/cenace_data.py | 88 +++++++++++++++++++ src/dataset_registry.py | 25 ++++++ src/evaluation/cenace/__init__.py | 0 src/evaluation/cenace/core.py | 92 ++++++++++++++++++++ src/evaluation/cenace/metrics.py | 47 +++++++++++ src/forecast/cenace/__init__.py | 0 src/forecast/cenace/core.py | 93 ++++++++++++++++++++ src/forecast/cenace/models.py | 122 +++++++++++++++++++++++++++ src/pipeline.py | 32 +++++++ 14 files changed, 631 insertions(+) create mode 100644 src/cenace_pipeline.py create mode 100644 src/data/cenace/__init__.py create mode 100644 src/data/cenace/aggregate/core.py create mode 100644 src/data/cenace/config.py create mode 100644 src/data/cenace/transform/core.py create mode 100644 src/data/cenace/utils/cenace_data.py create mode 100644 src/dataset_registry.py create mode 100644 src/evaluation/cenace/__init__.py create mode 100644 src/evaluation/cenace/core.py create mode 100644 src/evaluation/cenace/metrics.py create mode 100644 src/forecast/cenace/__init__.py create mode 100644 src/forecast/cenace/core.py create mode 100644 src/forecast/cenace/models.py create mode 100644 src/pipeline.py diff --git a/src/cenace_pipeline.py b/src/cenace_pipeline.py new file mode 100644 index 0000000..e437e7c --- /dev/null +++ b/src/cenace_pipeline.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import argparse + +import pandas as pd + +from src.data.cenace.aggregate.core import build_hourly_partitions +from src.evaluation.cenace.core import run_evaluation +from src.forecast.cenace.core import run_forecast + + +def run_cenace_pipeline( + cutoff: str, + model: str, + h: int = 24, + max_window_size: int = 48, + skip_aggregate: bool = False, +) -> tuple[str, str]: + try: + cutoff_ts = pd.Timestamp(cutoff) + except Exception as exc: + raise ValueError(f"Invalid cutoff timestamp: {cutoff}") from exc + + if not skip_aggregate: + n_written = build_hourly_partitions() + print(f"Aggregated {n_written} partitions") + + forecast_path = run_forecast( + cutoff=cutoff_ts, + model=model, + h=h, + max_window_size=max_window_size, + ) + print(f"Forecasts saved to: {forecast_path}") + + eval_path = run_evaluation( + cutoff=cutoff_ts, + model=model, + h=h, + max_window_size=max_window_size, + ) + print(f"Metrics saved to: {eval_path}") + + return str(forecast_path), str(eval_path) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--cutoff", required=True) + parser.add_argument("--model", required=True) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + parser.add_argument("--skip-aggregate", action="store_true") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + run_cenace_pipeline( + cutoff=args.cutoff, + model=args.model, + h=args.h, + max_window_size=args.max_window_size, + skip_aggregate=args.skip_aggregate, + ) + + +if __name__ == "__main__": + main() diff --git a/src/data/cenace/__init__.py b/src/data/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data/cenace/aggregate/core.py b/src/data/cenace/aggregate/core.py new file mode 100644 index 0000000..99433f8 --- /dev/null +++ b/src/data/cenace/aggregate/core.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import pandas as pd + +from src.data.cenace.config import PROCESSED_CSV, PROCESSED_EVENTS_HOURLY_DIR + +INPUT_CSV = PROCESSED_CSV +OUTPUT_ROOT = PROCESSED_EVENTS_HOURLY_DIR + + +def build_hourly_partitions() -> int: + df = pd.read_csv(INPUT_CSV) + + df["ds"] = pd.to_datetime(df["ds"], errors="coerce") + df["y"] = pd.to_numeric(df["y"], errors="coerce") + + df = df.dropna(subset=["unique_id", "ds", "y"]).copy() + df = df.sort_values(["unique_id", "ds"]).drop_duplicates(["unique_id", "ds"]) + + df["year"] = df["ds"].dt.year + df["month"] = df["ds"].dt.month + df["day"] = df["ds"].dt.day + + OUTPUT_ROOT.mkdir(parents=True, exist_ok=True) + + n_written = 0 + for (year, month, day), part in df.groupby(["year", "month", "day"], sort=True): + part_dir = ( + OUTPUT_ROOT / f"year={year:04d}" / f"month={month:02d}" / f"day={day:02d}" + ) + part_dir.mkdir(parents=True, exist_ok=True) + + out_path = part_dir / "series.parquet" + part[["unique_id", "ds", "y"]].to_parquet(out_path, index=False) + + print(f"Saved: {out_path}") + n_written += 1 + + return n_written + + +def main() -> None: + n_written = build_hourly_partitions() + print(f"\nDone. Wrote {n_written} daily partitions.") + + +if __name__ == "__main__": + main() diff --git a/src/data/cenace/config.py b/src/data/cenace/config.py new file mode 100644 index 0000000..baf3893 --- /dev/null +++ b/src/data/cenace/config.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[3] + +DATA_ROOT = ROOT / "data" / "cenace" + +TMP_DIR = DATA_ROOT / "tmp" +PROCESSED_DIR = DATA_ROOT / "processed" +PROCESSED_CSV = PROCESSED_DIR / "cenace.csv" + +PROCESSED_EVENTS_HOURLY_DIR = DATA_ROOT / "processed-events" / "hourly" +FORECASTS_HOURLY_DIR = DATA_ROOT / "forecasts" / "hourly" +EVALUATIONS_HOURLY_DIR = DATA_ROOT / "evaluations" / "hourly" diff --git a/src/data/cenace/transform/core.py b/src/data/cenace/transform/core.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data/cenace/utils/cenace_data.py b/src/data/cenace/utils/cenace_data.py new file mode 100644 index 0000000..5142720 --- /dev/null +++ b/src/data/cenace/utils/cenace_data.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +import duckdb +import pandas as pd + + +@dataclass +class CENACEData: + base_path: Path + freq: str = "hourly" + h: int = 24 + max_window_size: int = 24 * 90 + + def __post_init__(self) -> None: + self.base_path = Path(self.base_path) + + def _date_to_partition(self, d: pd.Timestamp) -> Path: + return ( + self.base_path + / f"year={d.year:04d}" + / f"month={d.month:02d}" + / f"day={d.day:02d}" + / "series.parquet" + ) + + def _paths_for_range(self, start: pd.Timestamp, end: pd.Timestamp) -> list[str]: + days = pd.date_range(start.normalize(), end.normalize(), freq="D") + paths = [self._date_to_partition(d) for d in days] + existing = [str(p) for p in paths if p.exists()] + if not existing: + raise FileNotFoundError( + f"No parquet files found between {start} and \ + {end} under {self.base_path}" + ) + return existing + + def get_df( + self, + cutoff: str | pd.Timestamp, + max_window_size: int | None = None, + sort: bool = True, + ) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + window = max_window_size or self.max_window_size + start = cutoff - pd.Timedelta(hours=window - 1) + + paths = self._paths_for_range(start, cutoff) + + query = f""" + SELECT unique_id, ds, y + FROM read_parquet({paths}) + WHERE ds >= TIMESTAMP '{start}' + AND ds <= TIMESTAMP '{cutoff}' + """ + + df = duckdb.sql(query).df() + df["ds"] = pd.to_datetime(df["ds"]) + + if sort: + df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) + + return df + + def get_actuals( + self, cutoff: str | pd.Timestamp, h: int | None = None + ) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + horizon = h or self.h + + start = cutoff + pd.Timedelta(hours=1) + end = cutoff + pd.Timedelta(hours=horizon) + + paths = self._paths_for_range(start, end) + + query = f""" + SELECT unique_id, ds, y + FROM read_parquet({paths}) + WHERE ds >= TIMESTAMP '{start}' + AND ds <= TIMESTAMP '{end}' + """ + + df = duckdb.sql(query).df() + df["ds"] = pd.to_datetime(df["ds"]) + df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) + return df diff --git a/src/dataset_registry.py b/src/dataset_registry.py new file mode 100644 index 0000000..87e674c --- /dev/null +++ b/src/dataset_registry.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from src.cenace_pipeline import run_cenace_pipeline + +DatasetRunner = Callable[..., tuple[str, str]] + +DATASET_REGISTRY: dict[str, DatasetRunner] = { + "cenace": run_cenace_pipeline, +} + +PIPELINE_DATASET_CHOICES = sorted(DATASET_REGISTRY) + + +def run_dataset_pipeline(dataset: str, **kwargs: Any) -> tuple[str, str]: + try: + runner = DATASET_REGISTRY[dataset] + except KeyError as exc: + raise ValueError( + f"Unsupported dataset: {dataset}. " f"Available: {PIPELINE_DATASET_CHOICES}" + ) from exc + + return runner(**kwargs) diff --git a/src/evaluation/cenace/__init__.py b/src/evaluation/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/evaluation/cenace/core.py b/src/evaluation/cenace/core.py new file mode 100644 index 0000000..b6207e9 --- /dev/null +++ b/src/evaluation/cenace/core.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import pandas as pd + +from src.data.cenace.config import ( + EVALUATIONS_HOURLY_DIR, + FORECASTS_HOURLY_DIR, + PROCESSED_EVENTS_HOURLY_DIR, +) +from src.data.cenace.utils.cenace_data import CENACEData +from src.evaluation.cenace.metrics import evaluate_forecasts + + +def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: + return ( + root + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + ) + + +def run_evaluation( + cutoff: str | pd.Timestamp, + model: str, + h: int = 24, + max_window_size: int = 48, +) -> Path: + cutoff = pd.Timestamp(cutoff) + + data = CENACEData( + base_path=PROCESSED_EVENTS_HOURLY_DIR, + freq="hourly", + h=h, + max_window_size=max_window_size, + ) + + forecast_path = ( + FORECASTS_HOURLY_DIR + / model + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + / "forecasts.parquet" + ) + + actuals = data.get_actuals(cutoff, h=h) + forecasts = pd.read_parquet(forecast_path) + + merged = forecasts.merge(actuals, on=["unique_id", "ds"], how="inner") + if merged.empty: + raise ValueError("Merged forecasts/actuals is empty") + + metrics = evaluate_forecasts(merged) + + eval_root = EVALUATIONS_HOURLY_DIR / model + out_dir = cutoff_partition(eval_root, cutoff) + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "metrics.parquet" + + metrics.to_parquet(out_path, index=False) + return out_path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--cutoff", required=True) + parser.add_argument("--model", required=True) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + out_path = run_evaluation( + cutoff=args.cutoff, + model=args.model, + h=args.h, + max_window_size=args.max_window_size, + ) + metrics = pd.read_parquet(out_path) + print(f"Saved metrics: {out_path}") + print(metrics.head()) + print(metrics.shape) + + +if __name__ == "__main__": + main() diff --git a/src/evaluation/cenace/metrics.py b/src/evaluation/cenace/metrics.py new file mode 100644 index 0000000..435fe70 --- /dev/null +++ b/src/evaluation/cenace/metrics.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import pandas as pd + + +def mae(y_true: pd.Series, y_pred: pd.Series) -> float: + return (y_true - y_pred).abs().mean() + + +def rmse(y_true: pd.Series, y_pred: pd.Series) -> float: + return ((y_true - y_pred) ** 2).mean() ** 0.5 + + +def smape(y_true: pd.Series, y_pred: pd.Series) -> float: + denom = (y_true.abs() + y_pred.abs()) / 2 + out = (y_true - y_pred).abs() / denom + out = out.where(denom != 0, 0.0) + return 100 * out.mean() + + +def evaluate_forecasts(merged: pd.DataFrame) -> pd.DataFrame: + per_uid = ( + merged.groupby("unique_id", as_index=False) + .apply( + lambda g: pd.Series( + { + "mae": mae(g["y"], g["y_hat"]), + "rmse": rmse(g["y"], g["y_hat"]), + "smape": smape(g["y"], g["y_hat"]), + } + ) + ) + .reset_index(drop=True) + ) + + overall = pd.DataFrame( + [ + { + "unique_id": "__overall__", + "mae": mae(merged["y"], merged["y_hat"]), + "rmse": rmse(merged["y"], merged["y_hat"]), + "smape": smape(merged["y"], merged["y_hat"]), + } + ] + ) + + return pd.concat([per_uid, overall], ignore_index=True) diff --git a/src/forecast/cenace/__init__.py b/src/forecast/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/forecast/cenace/core.py b/src/forecast/cenace/core.py new file mode 100644 index 0000000..1bc8b72 --- /dev/null +++ b/src/forecast/cenace/core.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import pandas as pd + +from src.data.cenace.config import FORECASTS_HOURLY_DIR, PROCESSED_EVENTS_HOURLY_DIR +from src.data.cenace.utils.cenace_data import CENACEData +from src.forecast.cenace.models import MODEL_REGISTRY + + +def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: + return ( + root + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--cutoff", required=True) + parser.add_argument("--model", required=True, choices=sorted(MODEL_REGISTRY)) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + return parser.parse_args() + + +def run_forecast( + cutoff: str | pd.Timestamp, + model: str, + h: int = 24, + max_window_size: int = 48, +) -> Path: + cutoff = pd.Timestamp(cutoff) + + data = CENACEData( + base_path=PROCESSED_EVENTS_HOURLY_DIR, + freq="hourly", + h=h, + max_window_size=max_window_size, + ) + + train = data.get_df(cutoff, max_window_size=max_window_size) + + if model not in MODEL_REGISTRY: + raise ValueError( + f"Unknown CENACE model: {model}. " f"Available: {sorted(MODEL_REGISTRY)}" + ) + + model_fn = MODEL_REGISTRY[model] + forecasts = model_fn(train, cutoff=cutoff, h=h) + + forecast_root = FORECASTS_HOURLY_DIR / model + out_dir = cutoff_partition(forecast_root, cutoff) + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "forecasts.parquet" + + forecasts.to_parquet(out_path, index=False) + return out_path + + +def main() -> None: + args = parse_args() + cutoff = pd.Timestamp(args.cutoff) + + data = CENACEData( + base_path=PROCESSED_EVENTS_HOURLY_DIR, + freq="hourly", + h=args.h, + max_window_size=args.max_window_size, + ) + + train = data.get_df(cutoff, max_window_size=args.max_window_size) + model_fn = MODEL_REGISTRY[args.model] + forecasts = model_fn(train, cutoff=cutoff, h=args.h) + + forecast_root = FORECASTS_HOURLY_DIR / args.model + out_dir = cutoff_partition(forecast_root, cutoff) + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "forecasts.parquet" + + forecasts.to_parquet(out_path, index=False) + + print(f"Saved forecasts: {out_path}") + print(forecasts.head()) + print(forecasts.shape) + + +if __name__ == "__main__": + main() diff --git a/src/forecast/cenace/models.py b/src/forecast/cenace/models.py new file mode 100644 index 0000000..3a11d21 --- /dev/null +++ b/src/forecast/cenace/models.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import pandas as pd + +from src.forecast.forecast import generate_forecast + + +def naive_last_value(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + + last_values = ( + train_df.sort_values(["unique_id", "ds"]) + .groupby("unique_id", as_index=False) + .tail(1)[["unique_id", "y"]] + .rename(columns={"y": "y_hat"}) + ) + + future_ds = pd.date_range( + cutoff + pd.Timedelta(hours=1), + periods=h, + freq="h", + ) + + out = [] + for ds in future_ds: + tmp = last_values.copy() + tmp["ds"] = ds + out.append(tmp) + + fcst = pd.concat(out, ignore_index=True) + return ( + fcst[["unique_id", "ds", "y_hat"]] + .sort_values(["unique_id", "ds"]) + .reset_index(drop=True) + ) + + +def seasonal_naive_24(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + + expected_start = cutoff - pd.Timedelta(hours=23) + last_day = train_df.loc[ + (train_df["ds"] >= expected_start) & (train_df["ds"] <= cutoff), + ["unique_id", "ds", "y"], + ].copy() + + counts = last_day.groupby("unique_id")["ds"].count() + bad_ids = counts[counts != 24] + if not bad_ids.empty: + raise ValueError( + "seasonal_naive_24 needs exactly 24 hourly observations in the last day " + f"for every series. Bad series count: {len(bad_ids)}" + ) + + last_day["hour_ahead"] = ( + (last_day["ds"] - expected_start) / pd.Timedelta(hours=1) + ).astype(int) + profile = last_day[["unique_id", "hour_ahead", "y"]].rename(columns={"y": "y_hat"}) + + unique_ids = profile["unique_id"].drop_duplicates().sort_values().tolist() + future_ds = pd.date_range(cutoff + pd.Timedelta(hours=1), periods=h, freq="h") + + future_index = pd.DataFrame( + [(uid, ds, i % 24) for i, ds in enumerate(future_ds) for uid in unique_ids], + columns=["unique_id", "ds", "hour_ahead"], + ) + + fcst = ( + future_index.merge(profile, on=["unique_id", "hour_ahead"], how="left")[ + ["unique_id", "ds", "y_hat"] + ] + .sort_values(["unique_id", "ds"]) + .reset_index(drop=True) + ) + + if fcst["y_hat"].isna().any(): + raise ValueError( + "seasonal_naive_24" + " produced missing forecasts after profile merge." + ) + + return fcst + + +def auto_arima(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + + raw = generate_forecast( + model_name="auto_arima", + df=train_df, + h=h, + freq="h", + ).copy() + + point_candidates = [ + col + for col in raw.columns + if col not in {"unique_id", "ds"} and "-q-" not in col + ] + if "auto_arima" in point_candidates: + point_col = "auto_arima" + elif len(point_candidates) == 1: + point_col = point_candidates[0] + else: + raise ValueError( + "Could not detect AutoARIMA point forecast column. " + f"Candidates found: {point_candidates}" + ) + + fcst = ( + raw[["unique_id", "ds", point_col]] + .rename(columns={point_col: "y_hat"}) + .sort_values(["unique_id", "ds"]) + .reset_index(drop=True) + ) + return fcst + + +MODEL_REGISTRY = { + "naive_last_value": naive_last_value, + "seasonal_naive_24": seasonal_naive_24, + "auto_arima": auto_arima, +} diff --git a/src/pipeline.py b/src/pipeline.py new file mode 100644 index 0000000..28bc14b --- /dev/null +++ b/src/pipeline.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import argparse + +from src.dataset_registry import PIPELINE_DATASET_CHOICES, run_dataset_pipeline + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--dataset", required=True, choices=PIPELINE_DATASET_CHOICES) + parser.add_argument("--model", required=True) + parser.add_argument("--cutoff", required=True) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + parser.add_argument("--skip-aggregate", action="store_true") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + run_dataset_pipeline( + dataset=args.dataset, + cutoff=args.cutoff, + model=args.model, + h=args.h, + max_window_size=args.max_window_size, + skip_aggregate=args.skip_aggregate, + ) + + +if __name__ == "__main__": + main() From 157587711987842e0b1c5923c6ec79cee4473c1f Mon Sep 17 00:00:00 2001 From: elmartinj Date: Mon, 20 Apr 2026 17:18:16 -0600 Subject: [PATCH 2/4] Reshape CENACE forecast and evaluation to follow GH structure --- src/cenace_pipeline.py | 69 ----------------- src/dataset_registry.py | 25 ------- src/evaluation/cenace/metrics.py | 47 ------------ src/forecast/cenace/models.py | 122 ------------------------------- src/pipeline.py | 32 -------- 5 files changed, 295 deletions(-) delete mode 100644 src/cenace_pipeline.py delete mode 100644 src/dataset_registry.py delete mode 100644 src/evaluation/cenace/metrics.py delete mode 100644 src/forecast/cenace/models.py delete mode 100644 src/pipeline.py diff --git a/src/cenace_pipeline.py b/src/cenace_pipeline.py deleted file mode 100644 index e437e7c..0000000 --- a/src/cenace_pipeline.py +++ /dev/null @@ -1,69 +0,0 @@ -from __future__ import annotations - -import argparse - -import pandas as pd - -from src.data.cenace.aggregate.core import build_hourly_partitions -from src.evaluation.cenace.core import run_evaluation -from src.forecast.cenace.core import run_forecast - - -def run_cenace_pipeline( - cutoff: str, - model: str, - h: int = 24, - max_window_size: int = 48, - skip_aggregate: bool = False, -) -> tuple[str, str]: - try: - cutoff_ts = pd.Timestamp(cutoff) - except Exception as exc: - raise ValueError(f"Invalid cutoff timestamp: {cutoff}") from exc - - if not skip_aggregate: - n_written = build_hourly_partitions() - print(f"Aggregated {n_written} partitions") - - forecast_path = run_forecast( - cutoff=cutoff_ts, - model=model, - h=h, - max_window_size=max_window_size, - ) - print(f"Forecasts saved to: {forecast_path}") - - eval_path = run_evaluation( - cutoff=cutoff_ts, - model=model, - h=h, - max_window_size=max_window_size, - ) - print(f"Metrics saved to: {eval_path}") - - return str(forecast_path), str(eval_path) - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser() - parser.add_argument("--cutoff", required=True) - parser.add_argument("--model", required=True) - parser.add_argument("--h", type=int, default=24) - parser.add_argument("--max-window-size", type=int, default=48) - parser.add_argument("--skip-aggregate", action="store_true") - return parser.parse_args() - - -def main() -> None: - args = parse_args() - run_cenace_pipeline( - cutoff=args.cutoff, - model=args.model, - h=args.h, - max_window_size=args.max_window_size, - skip_aggregate=args.skip_aggregate, - ) - - -if __name__ == "__main__": - main() diff --git a/src/dataset_registry.py b/src/dataset_registry.py deleted file mode 100644 index 87e674c..0000000 --- a/src/dataset_registry.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - -from collections.abc import Callable -from typing import Any - -from src.cenace_pipeline import run_cenace_pipeline - -DatasetRunner = Callable[..., tuple[str, str]] - -DATASET_REGISTRY: dict[str, DatasetRunner] = { - "cenace": run_cenace_pipeline, -} - -PIPELINE_DATASET_CHOICES = sorted(DATASET_REGISTRY) - - -def run_dataset_pipeline(dataset: str, **kwargs: Any) -> tuple[str, str]: - try: - runner = DATASET_REGISTRY[dataset] - except KeyError as exc: - raise ValueError( - f"Unsupported dataset: {dataset}. " f"Available: {PIPELINE_DATASET_CHOICES}" - ) from exc - - return runner(**kwargs) diff --git a/src/evaluation/cenace/metrics.py b/src/evaluation/cenace/metrics.py deleted file mode 100644 index 435fe70..0000000 --- a/src/evaluation/cenace/metrics.py +++ /dev/null @@ -1,47 +0,0 @@ -from __future__ import annotations - -import pandas as pd - - -def mae(y_true: pd.Series, y_pred: pd.Series) -> float: - return (y_true - y_pred).abs().mean() - - -def rmse(y_true: pd.Series, y_pred: pd.Series) -> float: - return ((y_true - y_pred) ** 2).mean() ** 0.5 - - -def smape(y_true: pd.Series, y_pred: pd.Series) -> float: - denom = (y_true.abs() + y_pred.abs()) / 2 - out = (y_true - y_pred).abs() / denom - out = out.where(denom != 0, 0.0) - return 100 * out.mean() - - -def evaluate_forecasts(merged: pd.DataFrame) -> pd.DataFrame: - per_uid = ( - merged.groupby("unique_id", as_index=False) - .apply( - lambda g: pd.Series( - { - "mae": mae(g["y"], g["y_hat"]), - "rmse": rmse(g["y"], g["y_hat"]), - "smape": smape(g["y"], g["y_hat"]), - } - ) - ) - .reset_index(drop=True) - ) - - overall = pd.DataFrame( - [ - { - "unique_id": "__overall__", - "mae": mae(merged["y"], merged["y_hat"]), - "rmse": rmse(merged["y"], merged["y_hat"]), - "smape": smape(merged["y"], merged["y_hat"]), - } - ] - ) - - return pd.concat([per_uid, overall], ignore_index=True) diff --git a/src/forecast/cenace/models.py b/src/forecast/cenace/models.py deleted file mode 100644 index 3a11d21..0000000 --- a/src/forecast/cenace/models.py +++ /dev/null @@ -1,122 +0,0 @@ -from __future__ import annotations - -import pandas as pd - -from src.forecast.forecast import generate_forecast - - -def naive_last_value(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: - cutoff = pd.Timestamp(cutoff) - - last_values = ( - train_df.sort_values(["unique_id", "ds"]) - .groupby("unique_id", as_index=False) - .tail(1)[["unique_id", "y"]] - .rename(columns={"y": "y_hat"}) - ) - - future_ds = pd.date_range( - cutoff + pd.Timedelta(hours=1), - periods=h, - freq="h", - ) - - out = [] - for ds in future_ds: - tmp = last_values.copy() - tmp["ds"] = ds - out.append(tmp) - - fcst = pd.concat(out, ignore_index=True) - return ( - fcst[["unique_id", "ds", "y_hat"]] - .sort_values(["unique_id", "ds"]) - .reset_index(drop=True) - ) - - -def seasonal_naive_24(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: - cutoff = pd.Timestamp(cutoff) - - expected_start = cutoff - pd.Timedelta(hours=23) - last_day = train_df.loc[ - (train_df["ds"] >= expected_start) & (train_df["ds"] <= cutoff), - ["unique_id", "ds", "y"], - ].copy() - - counts = last_day.groupby("unique_id")["ds"].count() - bad_ids = counts[counts != 24] - if not bad_ids.empty: - raise ValueError( - "seasonal_naive_24 needs exactly 24 hourly observations in the last day " - f"for every series. Bad series count: {len(bad_ids)}" - ) - - last_day["hour_ahead"] = ( - (last_day["ds"] - expected_start) / pd.Timedelta(hours=1) - ).astype(int) - profile = last_day[["unique_id", "hour_ahead", "y"]].rename(columns={"y": "y_hat"}) - - unique_ids = profile["unique_id"].drop_duplicates().sort_values().tolist() - future_ds = pd.date_range(cutoff + pd.Timedelta(hours=1), periods=h, freq="h") - - future_index = pd.DataFrame( - [(uid, ds, i % 24) for i, ds in enumerate(future_ds) for uid in unique_ids], - columns=["unique_id", "ds", "hour_ahead"], - ) - - fcst = ( - future_index.merge(profile, on=["unique_id", "hour_ahead"], how="left")[ - ["unique_id", "ds", "y_hat"] - ] - .sort_values(["unique_id", "ds"]) - .reset_index(drop=True) - ) - - if fcst["y_hat"].isna().any(): - raise ValueError( - "seasonal_naive_24" + " produced missing forecasts after profile merge." - ) - - return fcst - - -def auto_arima(train_df: pd.DataFrame, cutoff: str, h: int = 24) -> pd.DataFrame: - cutoff = pd.Timestamp(cutoff) - - raw = generate_forecast( - model_name="auto_arima", - df=train_df, - h=h, - freq="h", - ).copy() - - point_candidates = [ - col - for col in raw.columns - if col not in {"unique_id", "ds"} and "-q-" not in col - ] - if "auto_arima" in point_candidates: - point_col = "auto_arima" - elif len(point_candidates) == 1: - point_col = point_candidates[0] - else: - raise ValueError( - "Could not detect AutoARIMA point forecast column. " - f"Candidates found: {point_candidates}" - ) - - fcst = ( - raw[["unique_id", "ds", point_col]] - .rename(columns={point_col: "y_hat"}) - .sort_values(["unique_id", "ds"]) - .reset_index(drop=True) - ) - return fcst - - -MODEL_REGISTRY = { - "naive_last_value": naive_last_value, - "seasonal_naive_24": seasonal_naive_24, - "auto_arima": auto_arima, -} diff --git a/src/pipeline.py b/src/pipeline.py deleted file mode 100644 index 28bc14b..0000000 --- a/src/pipeline.py +++ /dev/null @@ -1,32 +0,0 @@ -from __future__ import annotations - -import argparse - -from src.dataset_registry import PIPELINE_DATASET_CHOICES, run_dataset_pipeline - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser() - parser.add_argument("--dataset", required=True, choices=PIPELINE_DATASET_CHOICES) - parser.add_argument("--model", required=True) - parser.add_argument("--cutoff", required=True) - parser.add_argument("--h", type=int, default=24) - parser.add_argument("--max-window-size", type=int, default=48) - parser.add_argument("--skip-aggregate", action="store_true") - return parser.parse_args() - - -def main() -> None: - args = parse_args() - run_dataset_pipeline( - dataset=args.dataset, - cutoff=args.cutoff, - model=args.model, - h=args.h, - max_window_size=args.max_window_size, - skip_aggregate=args.skip_aggregate, - ) - - -if __name__ == "__main__": - main() From cb21d1e5eedb2afd3fae03e79ce4c16b0418750b Mon Sep 17 00:00:00 2001 From: elmartinj Date: Mon, 20 Apr 2026 17:42:37 -0600 Subject: [PATCH 3/4] Align CENACE forecast and evaluation with GH structure --- src/data/cenace/extract/core.py | 107 ++++++++++++++++++++++++++++++++ src/evaluation/cenace/core.py | 14 +++-- src/forecast/cenace/core.py | 44 +++++-------- 3 files changed, 131 insertions(+), 34 deletions(-) create mode 100644 src/data/cenace/extract/core.py diff --git a/src/data/cenace/extract/core.py b/src/data/cenace/extract/core.py new file mode 100644 index 0000000..3388b7a --- /dev/null +++ b/src/data/cenace/extract/core.py @@ -0,0 +1,107 @@ +import argparse +from datetime import datetime, timedelta +from pathlib import Path +import requests +from bs4 import BeautifulSoup +import zipfile + +URL = "https://www.cenace.gob.mx/Paginas/SIM/Reportes/PreEnerServConMTR.aspx" + +session = requests.Session() + +HEADERS = { + "User-Agent": "Mozilla/5.0", + "Referer": URL, + "Origin": "https://www.cenace.gob.mx", + "Content-Type": "application/x-www-form-urlencoded", +} + +# repo root = imper/ +ROOT_DIR = Path(__file__).resolve().parents[4] +DEFAULT_BASE_DIR = ROOT_DIR / "data" / "cenace" + + +def get_form_state(): + r = session.get(URL, headers=HEADERS) + soup = BeautifulSoup(r.text, "html.parser") + + def get_value(name): + el = soup.find("input", {"name": name}) + return el.get("value") if el else "" + + return { + "__VIEWSTATE": get_value("__VIEWSTATE"), + "__VIEWSTATEGENERATOR": get_value("__VIEWSTATEGENERATOR"), + "__VIEWSTATEENCRYPTED": get_value("__VIEWSTATEENCRYPTED"), + "__EVENTVALIDATION": get_value("__EVENTVALIDATION"), + } + + +def download_and_extract(date, raw_dir, tmp_dir): + date_str = date.strftime("%d/%m/%Y") + period_str = f"{date_str} - {date_str}" + + state = get_form_state() + + payload = { + "ctl00$ContentPlaceHolder1$ddlReporte": "362,325", + "ctl00$ContentPlaceHolder1$ddlPeriodicidad": "D", + "ctl00$ContentPlaceHolder1$ddlSistema": "SIN", + "ctl00$ContentPlaceHolder1$txtPeriodo": period_str, + "ctl00$ContentPlaceHolder1$hdfStartDateSelected": date_str, + "ctl00$ContentPlaceHolder1$hdfEndDateSelected": date_str, + "ctl00$ContentPlaceHolder1$btnDescargarZIP": "Descargar ZIP", + "__VIEWSTATE": state["__VIEWSTATE"], + "__VIEWSTATEGENERATOR": state["__VIEWSTATEGENERATOR"], + "__VIEWSTATEENCRYPTED": state["__VIEWSTATEENCRYPTED"], + "__EVENTVALIDATION": state["__EVENTVALIDATION"], + "__EVENTTARGET": "", + "__EVENTARGUMENT": "", + } + + r = session.post(URL, data=payload, headers=HEADERS) + + size = len(r.content) + print(f"{date_str} | {size} bytes") + + if size < 10000: + print(f"Skipping {date_str}") + return + + raw_dir.mkdir(parents=True, exist_ok=True) + tmp_dir.mkdir(parents=True, exist_ok=True) + + zip_path = raw_dir / f"{date.strftime('%Y%m%d')}.zip" + + with open(zip_path, "wb") as f: + f.write(r.content) + + with zipfile.ZipFile(zip_path, "r") as z: + z.extractall(tmp_dir) + + +def run(start_date, end_date, base_dir): + raw_dir = base_dir / "raw" + tmp_dir = base_dir / "tmp" + + current = start_date + while current <= end_date: + try: + download_and_extract(current, raw_dir, tmp_dir) + except Exception as e: + print(f"Error on {current.strftime('%Y-%m-%d')}: {e}") + current += timedelta(days=1) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--start-date", default="2023-01-01") + parser.add_argument("--end-date", required=True) + parser.add_argument("--out", default=str(DEFAULT_BASE_DIR)) + + args = parser.parse_args() + + run( + start_date=datetime.strptime(args.start_date, "%Y-%m-%d"), + end_date=datetime.strptime(args.end_date, "%Y-%m-%d"), + base_dir=Path(args.out).resolve(), + ) diff --git a/src/evaluation/cenace/core.py b/src/evaluation/cenace/core.py index b6207e9..1257ac1 100644 --- a/src/evaluation/cenace/core.py +++ b/src/evaluation/cenace/core.py @@ -11,7 +11,7 @@ PROCESSED_EVENTS_HOURLY_DIR, ) from src.data.cenace.utils.cenace_data import CENACEData -from src.evaluation.cenace.metrics import evaluate_forecasts +from src.evaluation.evaluate import evaluate_forecast def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: @@ -47,14 +47,16 @@ def run_evaluation( / "forecasts.parquet" ) + train = data.get_df(cutoff, max_window_size=max_window_size) actuals = data.get_actuals(cutoff, h=h) forecasts = pd.read_parquet(forecast_path) - merged = forecasts.merge(actuals, on=["unique_id", "ds"], how="inner") - if merged.empty: - raise ValueError("Merged forecasts/actuals is empty") - - metrics = evaluate_forecasts(merged) + metrics, _ = evaluate_forecast( + forecast_df=forecasts, + actuals_df=actuals, + train_df=train, + seasonality=24, + ) eval_root = EVALUATIONS_HOURLY_DIR / model out_dir = cutoff_partition(eval_root, cutoff) diff --git a/src/forecast/cenace/core.py b/src/forecast/cenace/core.py index 1bc8b72..2b31af1 100644 --- a/src/forecast/cenace/core.py +++ b/src/forecast/cenace/core.py @@ -7,7 +7,7 @@ from src.data.cenace.config import FORECASTS_HOURLY_DIR, PROCESSED_EVENTS_HOURLY_DIR from src.data.cenace.utils.cenace_data import CENACEData -from src.forecast.cenace.models import MODEL_REGISTRY +from src.forecast.forecast import generate_forecast def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: @@ -22,7 +22,7 @@ def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--cutoff", required=True) - parser.add_argument("--model", required=True, choices=sorted(MODEL_REGISTRY)) + parser.add_argument("--model", required=True) parser.add_argument("--h", type=int, default=24) parser.add_argument("--max-window-size", type=int, default=48) return parser.parse_args() @@ -45,13 +45,17 @@ def run_forecast( train = data.get_df(cutoff, max_window_size=max_window_size) - if model not in MODEL_REGISTRY: - raise ValueError( - f"Unknown CENACE model: {model}. " f"Available: {sorted(MODEL_REGISTRY)}" - ) + model_name = "seasonal_naive" if model == "seasonal_naive_24" else model - model_fn = MODEL_REGISTRY[model] - forecasts = model_fn(train, cutoff=cutoff, h=h) + forecasts = generate_forecast( + model_name=model_name, + df=train, + h=h, + freq="h", + ) + + if "y_hat" in forecasts.columns: + forecasts = forecasts.rename(columns={"y_hat": model}) forecast_root = FORECASTS_HOURLY_DIR / model out_dir = cutoff_partition(forecast_root, cutoff) @@ -64,29 +68,13 @@ def run_forecast( def main() -> None: args = parse_args() - cutoff = pd.Timestamp(args.cutoff) - - data = CENACEData( - base_path=PROCESSED_EVENTS_HOURLY_DIR, - freq="hourly", + out_path = run_forecast( + cutoff=args.cutoff, + model=args.model, h=args.h, max_window_size=args.max_window_size, ) - - train = data.get_df(cutoff, max_window_size=args.max_window_size) - model_fn = MODEL_REGISTRY[args.model] - forecasts = model_fn(train, cutoff=cutoff, h=args.h) - - forecast_root = FORECASTS_HOURLY_DIR / args.model - out_dir = cutoff_partition(forecast_root, cutoff) - out_dir.mkdir(parents=True, exist_ok=True) - out_path = out_dir / "forecasts.parquet" - - forecasts.to_parquet(out_path, index=False) - - print(f"Saved forecasts: {out_path}") - print(forecasts.head()) - print(forecasts.shape) + print(f"Forecasts saved to: {out_path}") if __name__ == "__main__": From 3c2bf0eccba5cbb028af3e52c58da2dd0d22ded2 Mon Sep 17 00:00:00 2001 From: elmartinj Date: Mon, 20 Apr 2026 17:45:22 -0600 Subject: [PATCH 4/4] Add CENACE extractor with execution-date and backfill logic --- src/data/cenace/extract/core.py | 106 +++++++++++++++++++++++++------- 1 file changed, 84 insertions(+), 22 deletions(-) diff --git a/src/data/cenace/extract/core.py b/src/data/cenace/extract/core.py index 3388b7a..e9b633f 100644 --- a/src/data/cenace/extract/core.py +++ b/src/data/cenace/extract/core.py @@ -1,9 +1,12 @@ +from __future__ import annotations + import argparse from datetime import datetime, timedelta from pathlib import Path +import zipfile + import requests from bs4 import BeautifulSoup -import zipfile URL = "https://www.cenace.gob.mx/Paginas/SIM/Reportes/PreEnerServConMTR.aspx" @@ -16,16 +19,25 @@ "Content-Type": "application/x-www-form-urlencoded", } -# repo root = imper/ +# repo root = impermanent/ ROOT_DIR = Path(__file__).resolve().parents[4] DEFAULT_BASE_DIR = ROOT_DIR / "data" / "cenace" -def get_form_state(): +def target_date_for_execution(execution_date: datetime) -> datetime: + return execution_date + timedelta(days=1) + + +def raw_zip_path(date: datetime, raw_dir: Path) -> Path: + return raw_dir / f"{date.strftime('%Y%m%d')}.zip" + + +def get_form_state() -> dict[str, str]: r = session.get(URL, headers=HEADERS) + r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") - def get_value(name): + def get_value(name: str) -> str: el = soup.find("input", {"name": name}) return el.get("value") if el else "" @@ -37,7 +49,7 @@ def get_value(name): } -def download_and_extract(date, raw_dir, tmp_dir): +def download_and_extract(date: datetime, raw_dir: Path, tmp_dir: Path) -> bool: date_str = date.strftime("%d/%m/%Y") period_str = f"{date_str} - {date_str}" @@ -60,18 +72,19 @@ def download_and_extract(date, raw_dir, tmp_dir): } r = session.post(URL, data=payload, headers=HEADERS) + r.raise_for_status() size = len(r.content) print(f"{date_str} | {size} bytes") if size < 10000: - print(f"Skipping {date_str}") - return + print(f"Skipping {date_str}: file not published or response too small") + return False raw_dir.mkdir(parents=True, exist_ok=True) tmp_dir.mkdir(parents=True, exist_ok=True) - zip_path = raw_dir / f"{date.strftime('%Y%m%d')}.zip" + zip_path = raw_zip_path(date, raw_dir) with open(zip_path, "wb") as f: f.write(r.content) @@ -79,29 +92,78 @@ def download_and_extract(date, raw_dir, tmp_dir): with zipfile.ZipFile(zip_path, "r") as z: z.extractall(tmp_dir) + return True -def run(start_date, end_date, base_dir): + +def backfill_missing(start_date: datetime, end_date: datetime, base_dir: Path) -> None: raw_dir = base_dir / "raw" tmp_dir = base_dir / "tmp" current = start_date while current <= end_date: - try: - download_and_extract(current, raw_dir, tmp_dir) - except Exception as e: - print(f"Error on {current.strftime('%Y-%m-%d')}: {e}") + zip_path = raw_zip_path(current, raw_dir) + if zip_path.exists(): + print(f"Already have {current.strftime('%Y-%m-%d')}, skipping") + else: + try: + ok = download_and_extract(current, raw_dir, tmp_dir) + if not ok: + print(f"Stopping at {current.strftime('%Y-%m-%d')}") + break + except Exception as e: + print(f"Error on {current.strftime('%Y-%m-%d')}: {e}") + break current += timedelta(days=1) -if __name__ == "__main__": + +def run_execution_date(execution_date: datetime, base_dir: Path) -> bool: + raw_dir = base_dir / "raw" + tmp_dir = base_dir / "tmp" + target_date = target_date_for_execution(execution_date) + + zip_path = raw_zip_path(target_date, raw_dir) + if zip_path.exists(): + print(f"Already have {target_date.strftime('%Y-%m-%d')}, skipping") + return True + + return download_and_extract(target_date, raw_dir, tmp_dir) + + +def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() - parser.add_argument("--start-date", default="2023-01-01") - parser.add_argument("--end-date", required=True) + parser.add_argument("--execution-date", default=None) + parser.add_argument("--start-date", default=None) + parser.add_argument("--end-date", default=None) parser.add_argument("--out", default=str(DEFAULT_BASE_DIR)) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + base_dir = Path(args.out).resolve() + + if args.start_date: + start_date = datetime.strptime(args.start_date, "%Y-%m-%d") + if args.end_date: + end_date = datetime.strptime(args.end_date, "%Y-%m-%d") + elif args.execution_date: + end_date = target_date_for_execution( + datetime.strptime(args.execution_date, "%Y-%m-%d") + ) + else: + end_date = datetime.today() + backfill_missing(start_date=start_date, end_date=end_date, base_dir=base_dir) + return + + if args.execution_date: + execution_date = datetime.strptime(args.execution_date, "%Y-%m-%d") + ok = run_execution_date(execution_date=execution_date, base_dir=base_dir) + if not ok: + print("No new CENACE publication detected; stopping cleanly") + return - args = parser.parse_args() + raise ValueError("Provide either --start-date or --execution-date") - run( - start_date=datetime.strptime(args.start_date, "%Y-%m-%d"), - end_date=datetime.strptime(args.end_date, "%Y-%m-%d"), - base_dir=Path(args.out).resolve(), - ) + +if __name__ == "__main__": + main()