diff --git a/src/data/cenace/__init__.py b/src/data/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data/cenace/aggregate/core.py b/src/data/cenace/aggregate/core.py new file mode 100644 index 0000000..99433f8 --- /dev/null +++ b/src/data/cenace/aggregate/core.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import pandas as pd + +from src.data.cenace.config import PROCESSED_CSV, PROCESSED_EVENTS_HOURLY_DIR + +INPUT_CSV = PROCESSED_CSV +OUTPUT_ROOT = PROCESSED_EVENTS_HOURLY_DIR + + +def build_hourly_partitions() -> int: + df = pd.read_csv(INPUT_CSV) + + df["ds"] = pd.to_datetime(df["ds"], errors="coerce") + df["y"] = pd.to_numeric(df["y"], errors="coerce") + + df = df.dropna(subset=["unique_id", "ds", "y"]).copy() + df = df.sort_values(["unique_id", "ds"]).drop_duplicates(["unique_id", "ds"]) + + df["year"] = df["ds"].dt.year + df["month"] = df["ds"].dt.month + df["day"] = df["ds"].dt.day + + OUTPUT_ROOT.mkdir(parents=True, exist_ok=True) + + n_written = 0 + for (year, month, day), part in df.groupby(["year", "month", "day"], sort=True): + part_dir = ( + OUTPUT_ROOT / f"year={year:04d}" / f"month={month:02d}" / f"day={day:02d}" + ) + part_dir.mkdir(parents=True, exist_ok=True) + + out_path = part_dir / "series.parquet" + part[["unique_id", "ds", "y"]].to_parquet(out_path, index=False) + + print(f"Saved: {out_path}") + n_written += 1 + + return n_written + + +def main() -> None: + n_written = build_hourly_partitions() + print(f"\nDone. Wrote {n_written} daily partitions.") + + +if __name__ == "__main__": + main() diff --git a/src/data/cenace/config.py b/src/data/cenace/config.py new file mode 100644 index 0000000..baf3893 --- /dev/null +++ b/src/data/cenace/config.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[3] + +DATA_ROOT = ROOT / "data" / "cenace" + +TMP_DIR = DATA_ROOT / "tmp" +PROCESSED_DIR = DATA_ROOT / "processed" +PROCESSED_CSV = PROCESSED_DIR / "cenace.csv" + +PROCESSED_EVENTS_HOURLY_DIR = DATA_ROOT / "processed-events" / "hourly" +FORECASTS_HOURLY_DIR = DATA_ROOT / "forecasts" / "hourly" +EVALUATIONS_HOURLY_DIR = DATA_ROOT / "evaluations" / "hourly" diff --git a/src/data/cenace/extract/core.py b/src/data/cenace/extract/core.py new file mode 100644 index 0000000..e9b633f --- /dev/null +++ b/src/data/cenace/extract/core.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import argparse +from datetime import datetime, timedelta +from pathlib import Path +import zipfile + +import requests +from bs4 import BeautifulSoup + +URL = "https://www.cenace.gob.mx/Paginas/SIM/Reportes/PreEnerServConMTR.aspx" + +session = requests.Session() + +HEADERS = { + "User-Agent": "Mozilla/5.0", + "Referer": URL, + "Origin": "https://www.cenace.gob.mx", + "Content-Type": "application/x-www-form-urlencoded", +} + +# repo root = impermanent/ +ROOT_DIR = Path(__file__).resolve().parents[4] +DEFAULT_BASE_DIR = ROOT_DIR / "data" / "cenace" + + +def target_date_for_execution(execution_date: datetime) -> datetime: + return execution_date + timedelta(days=1) + + +def raw_zip_path(date: datetime, raw_dir: Path) -> Path: + return raw_dir / f"{date.strftime('%Y%m%d')}.zip" + + +def get_form_state() -> dict[str, str]: + r = session.get(URL, headers=HEADERS) + r.raise_for_status() + soup = BeautifulSoup(r.text, "html.parser") + + def get_value(name: str) -> str: + el = soup.find("input", {"name": name}) + return el.get("value") if el else "" + + return { + "__VIEWSTATE": get_value("__VIEWSTATE"), + "__VIEWSTATEGENERATOR": get_value("__VIEWSTATEGENERATOR"), + "__VIEWSTATEENCRYPTED": get_value("__VIEWSTATEENCRYPTED"), + "__EVENTVALIDATION": get_value("__EVENTVALIDATION"), + } + + +def download_and_extract(date: datetime, raw_dir: Path, tmp_dir: Path) -> bool: + date_str = date.strftime("%d/%m/%Y") + period_str = f"{date_str} - {date_str}" + + state = get_form_state() + + payload = { + "ctl00$ContentPlaceHolder1$ddlReporte": "362,325", + "ctl00$ContentPlaceHolder1$ddlPeriodicidad": "D", + "ctl00$ContentPlaceHolder1$ddlSistema": "SIN", + "ctl00$ContentPlaceHolder1$txtPeriodo": period_str, + "ctl00$ContentPlaceHolder1$hdfStartDateSelected": date_str, + "ctl00$ContentPlaceHolder1$hdfEndDateSelected": date_str, + "ctl00$ContentPlaceHolder1$btnDescargarZIP": "Descargar ZIP", + "__VIEWSTATE": state["__VIEWSTATE"], + "__VIEWSTATEGENERATOR": state["__VIEWSTATEGENERATOR"], + "__VIEWSTATEENCRYPTED": state["__VIEWSTATEENCRYPTED"], + "__EVENTVALIDATION": state["__EVENTVALIDATION"], + "__EVENTTARGET": "", + "__EVENTARGUMENT": "", + } + + r = session.post(URL, data=payload, headers=HEADERS) + r.raise_for_status() + + size = len(r.content) + print(f"{date_str} | {size} bytes") + + if size < 10000: + print(f"Skipping {date_str}: file not published or response too small") + return False + + raw_dir.mkdir(parents=True, exist_ok=True) + tmp_dir.mkdir(parents=True, exist_ok=True) + + zip_path = raw_zip_path(date, raw_dir) + + with open(zip_path, "wb") as f: + f.write(r.content) + + with zipfile.ZipFile(zip_path, "r") as z: + z.extractall(tmp_dir) + + return True + + +def backfill_missing(start_date: datetime, end_date: datetime, base_dir: Path) -> None: + raw_dir = base_dir / "raw" + tmp_dir = base_dir / "tmp" + + current = start_date + while current <= end_date: + zip_path = raw_zip_path(current, raw_dir) + if zip_path.exists(): + print(f"Already have {current.strftime('%Y-%m-%d')}, skipping") + else: + try: + ok = download_and_extract(current, raw_dir, tmp_dir) + if not ok: + print(f"Stopping at {current.strftime('%Y-%m-%d')}") + break + except Exception as e: + print(f"Error on {current.strftime('%Y-%m-%d')}: {e}") + break + current += timedelta(days=1) + + +def run_execution_date(execution_date: datetime, base_dir: Path) -> bool: + raw_dir = base_dir / "raw" + tmp_dir = base_dir / "tmp" + target_date = target_date_for_execution(execution_date) + + zip_path = raw_zip_path(target_date, raw_dir) + if zip_path.exists(): + print(f"Already have {target_date.strftime('%Y-%m-%d')}, skipping") + return True + + return download_and_extract(target_date, raw_dir, tmp_dir) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--execution-date", default=None) + parser.add_argument("--start-date", default=None) + parser.add_argument("--end-date", default=None) + parser.add_argument("--out", default=str(DEFAULT_BASE_DIR)) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + base_dir = Path(args.out).resolve() + + if args.start_date: + start_date = datetime.strptime(args.start_date, "%Y-%m-%d") + if args.end_date: + end_date = datetime.strptime(args.end_date, "%Y-%m-%d") + elif args.execution_date: + end_date = target_date_for_execution( + datetime.strptime(args.execution_date, "%Y-%m-%d") + ) + else: + end_date = datetime.today() + backfill_missing(start_date=start_date, end_date=end_date, base_dir=base_dir) + return + + if args.execution_date: + execution_date = datetime.strptime(args.execution_date, "%Y-%m-%d") + ok = run_execution_date(execution_date=execution_date, base_dir=base_dir) + if not ok: + print("No new CENACE publication detected; stopping cleanly") + return + + raise ValueError("Provide either --start-date or --execution-date") + + +if __name__ == "__main__": + main() diff --git a/src/data/cenace/transform/core.py b/src/data/cenace/transform/core.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data/cenace/utils/cenace_data.py b/src/data/cenace/utils/cenace_data.py new file mode 100644 index 0000000..5142720 --- /dev/null +++ b/src/data/cenace/utils/cenace_data.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +import duckdb +import pandas as pd + + +@dataclass +class CENACEData: + base_path: Path + freq: str = "hourly" + h: int = 24 + max_window_size: int = 24 * 90 + + def __post_init__(self) -> None: + self.base_path = Path(self.base_path) + + def _date_to_partition(self, d: pd.Timestamp) -> Path: + return ( + self.base_path + / f"year={d.year:04d}" + / f"month={d.month:02d}" + / f"day={d.day:02d}" + / "series.parquet" + ) + + def _paths_for_range(self, start: pd.Timestamp, end: pd.Timestamp) -> list[str]: + days = pd.date_range(start.normalize(), end.normalize(), freq="D") + paths = [self._date_to_partition(d) for d in days] + existing = [str(p) for p in paths if p.exists()] + if not existing: + raise FileNotFoundError( + f"No parquet files found between {start} and \ + {end} under {self.base_path}" + ) + return existing + + def get_df( + self, + cutoff: str | pd.Timestamp, + max_window_size: int | None = None, + sort: bool = True, + ) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + window = max_window_size or self.max_window_size + start = cutoff - pd.Timedelta(hours=window - 1) + + paths = self._paths_for_range(start, cutoff) + + query = f""" + SELECT unique_id, ds, y + FROM read_parquet({paths}) + WHERE ds >= TIMESTAMP '{start}' + AND ds <= TIMESTAMP '{cutoff}' + """ + + df = duckdb.sql(query).df() + df["ds"] = pd.to_datetime(df["ds"]) + + if sort: + df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) + + return df + + def get_actuals( + self, cutoff: str | pd.Timestamp, h: int | None = None + ) -> pd.DataFrame: + cutoff = pd.Timestamp(cutoff) + horizon = h or self.h + + start = cutoff + pd.Timedelta(hours=1) + end = cutoff + pd.Timedelta(hours=horizon) + + paths = self._paths_for_range(start, end) + + query = f""" + SELECT unique_id, ds, y + FROM read_parquet({paths}) + WHERE ds >= TIMESTAMP '{start}' + AND ds <= TIMESTAMP '{end}' + """ + + df = duckdb.sql(query).df() + df["ds"] = pd.to_datetime(df["ds"]) + df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) + return df diff --git a/src/evaluation/cenace/__init__.py b/src/evaluation/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/evaluation/cenace/core.py b/src/evaluation/cenace/core.py new file mode 100644 index 0000000..1257ac1 --- /dev/null +++ b/src/evaluation/cenace/core.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import pandas as pd + +from src.data.cenace.config import ( + EVALUATIONS_HOURLY_DIR, + FORECASTS_HOURLY_DIR, + PROCESSED_EVENTS_HOURLY_DIR, +) +from src.data.cenace.utils.cenace_data import CENACEData +from src.evaluation.evaluate import evaluate_forecast + + +def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: + return ( + root + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + ) + + +def run_evaluation( + cutoff: str | pd.Timestamp, + model: str, + h: int = 24, + max_window_size: int = 48, +) -> Path: + cutoff = pd.Timestamp(cutoff) + + data = CENACEData( + base_path=PROCESSED_EVENTS_HOURLY_DIR, + freq="hourly", + h=h, + max_window_size=max_window_size, + ) + + forecast_path = ( + FORECASTS_HOURLY_DIR + / model + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + / "forecasts.parquet" + ) + + train = data.get_df(cutoff, max_window_size=max_window_size) + actuals = data.get_actuals(cutoff, h=h) + forecasts = pd.read_parquet(forecast_path) + + metrics, _ = evaluate_forecast( + forecast_df=forecasts, + actuals_df=actuals, + train_df=train, + seasonality=24, + ) + + eval_root = EVALUATIONS_HOURLY_DIR / model + out_dir = cutoff_partition(eval_root, cutoff) + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "metrics.parquet" + + metrics.to_parquet(out_path, index=False) + return out_path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--cutoff", required=True) + parser.add_argument("--model", required=True) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + out_path = run_evaluation( + cutoff=args.cutoff, + model=args.model, + h=args.h, + max_window_size=args.max_window_size, + ) + metrics = pd.read_parquet(out_path) + print(f"Saved metrics: {out_path}") + print(metrics.head()) + print(metrics.shape) + + +if __name__ == "__main__": + main() diff --git a/src/forecast/cenace/__init__.py b/src/forecast/cenace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/forecast/cenace/core.py b/src/forecast/cenace/core.py new file mode 100644 index 0000000..2b31af1 --- /dev/null +++ b/src/forecast/cenace/core.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import pandas as pd + +from src.data.cenace.config import FORECASTS_HOURLY_DIR, PROCESSED_EVENTS_HOURLY_DIR +from src.data.cenace.utils.cenace_data import CENACEData +from src.forecast.forecast import generate_forecast + + +def cutoff_partition(root: Path, cutoff: pd.Timestamp) -> Path: + return ( + root + / f"year={cutoff.year:04d}" + / f"month={cutoff.month:02d}" + / f"day={cutoff.day:02d}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--cutoff", required=True) + parser.add_argument("--model", required=True) + parser.add_argument("--h", type=int, default=24) + parser.add_argument("--max-window-size", type=int, default=48) + return parser.parse_args() + + +def run_forecast( + cutoff: str | pd.Timestamp, + model: str, + h: int = 24, + max_window_size: int = 48, +) -> Path: + cutoff = pd.Timestamp(cutoff) + + data = CENACEData( + base_path=PROCESSED_EVENTS_HOURLY_DIR, + freq="hourly", + h=h, + max_window_size=max_window_size, + ) + + train = data.get_df(cutoff, max_window_size=max_window_size) + + model_name = "seasonal_naive" if model == "seasonal_naive_24" else model + + forecasts = generate_forecast( + model_name=model_name, + df=train, + h=h, + freq="h", + ) + + if "y_hat" in forecasts.columns: + forecasts = forecasts.rename(columns={"y_hat": model}) + + forecast_root = FORECASTS_HOURLY_DIR / model + out_dir = cutoff_partition(forecast_root, cutoff) + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "forecasts.parquet" + + forecasts.to_parquet(out_path, index=False) + return out_path + + +def main() -> None: + args = parse_args() + out_path = run_forecast( + cutoff=args.cutoff, + model=args.model, + h=args.h, + max_window_size=args.max_window_size, + ) + print(f"Forecasts saved to: {out_path}") + + +if __name__ == "__main__": + main()