-
Notifications
You must be signed in to change notification settings - Fork 1
Add initial CENACE pipeline integration #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
40559a3
1575877
cb21d1e
3c2bf0e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import pandas as pd | ||
|
|
||
| from src.data.cenace.config import PROCESSED_CSV, PROCESSED_EVENTS_HOURLY_DIR | ||
|
|
||
| INPUT_CSV = PROCESSED_CSV | ||
| OUTPUT_ROOT = PROCESSED_EVENTS_HOURLY_DIR | ||
|
|
||
|
|
||
| def build_hourly_partitions() -> int: | ||
| df = pd.read_csv(INPUT_CSV) | ||
|
|
||
| df["ds"] = pd.to_datetime(df["ds"], errors="coerce") | ||
| df["y"] = pd.to_numeric(df["y"], errors="coerce") | ||
|
|
||
| df = df.dropna(subset=["unique_id", "ds", "y"]).copy() | ||
| df = df.sort_values(["unique_id", "ds"]).drop_duplicates(["unique_id", "ds"]) | ||
|
|
||
| df["year"] = df["ds"].dt.year | ||
| df["month"] = df["ds"].dt.month | ||
| df["day"] = df["ds"].dt.day | ||
|
|
||
| OUTPUT_ROOT.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| n_written = 0 | ||
| for (year, month, day), part in df.groupby(["year", "month", "day"], sort=True): | ||
| part_dir = ( | ||
| OUTPUT_ROOT / f"year={year:04d}" / f"month={month:02d}" / f"day={day:02d}" | ||
| ) | ||
| part_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| out_path = part_dir / "series.parquet" | ||
| part[["unique_id", "ds", "y"]].to_parquet(out_path, index=False) | ||
|
|
||
| print(f"Saved: {out_path}") | ||
| n_written += 1 | ||
|
|
||
| return n_written | ||
|
|
||
|
|
||
| def main() -> None: | ||
| n_written = build_hourly_partitions() | ||
| print(f"\nDone. Wrote {n_written} daily partitions.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from pathlib import Path | ||
|
|
||
| ROOT = Path(__file__).resolve().parents[3] | ||
|
|
||
| DATA_ROOT = ROOT / "data" / "cenace" | ||
|
|
||
| TMP_DIR = DATA_ROOT / "tmp" | ||
| PROCESSED_DIR = DATA_ROOT / "processed" | ||
| PROCESSED_CSV = PROCESSED_DIR / "cenace.csv" | ||
|
|
||
| PROCESSED_EVENTS_HOURLY_DIR = DATA_ROOT / "processed-events" / "hourly" | ||
| FORECASTS_HOURLY_DIR = DATA_ROOT / "forecasts" / "hourly" | ||
| EVALUATIONS_HOURLY_DIR = DATA_ROOT / "evaluations" / "hourly" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import argparse | ||
| from datetime import datetime, timedelta | ||
| from pathlib import Path | ||
| import zipfile | ||
|
|
||
| import requests | ||
| from bs4 import BeautifulSoup | ||
|
|
||
| URL = "https://www.cenace.gob.mx/Paginas/SIM/Reportes/PreEnerServConMTR.aspx" | ||
|
|
||
| session = requests.Session() | ||
|
|
||
| HEADERS = { | ||
| "User-Agent": "Mozilla/5.0", | ||
| "Referer": URL, | ||
| "Origin": "https://www.cenace.gob.mx", | ||
| "Content-Type": "application/x-www-form-urlencoded", | ||
| } | ||
|
|
||
| # repo root = impermanent/ | ||
| ROOT_DIR = Path(__file__).resolve().parents[4] | ||
| DEFAULT_BASE_DIR = ROOT_DIR / "data" / "cenace" | ||
|
|
||
|
|
||
| def target_date_for_execution(execution_date: datetime) -> datetime: | ||
| return execution_date + timedelta(days=1) | ||
|
|
||
|
|
||
| def raw_zip_path(date: datetime, raw_dir: Path) -> Path: | ||
| return raw_dir / f"{date.strftime('%Y%m%d')}.zip" | ||
|
|
||
|
|
||
| def get_form_state() -> dict[str, str]: | ||
| r = session.get(URL, headers=HEADERS) | ||
| r.raise_for_status() | ||
| soup = BeautifulSoup(r.text, "html.parser") | ||
|
|
||
| def get_value(name: str) -> str: | ||
| el = soup.find("input", {"name": name}) | ||
| return el.get("value") if el else "" | ||
|
|
||
| return { | ||
| "__VIEWSTATE": get_value("__VIEWSTATE"), | ||
| "__VIEWSTATEGENERATOR": get_value("__VIEWSTATEGENERATOR"), | ||
| "__VIEWSTATEENCRYPTED": get_value("__VIEWSTATEENCRYPTED"), | ||
| "__EVENTVALIDATION": get_value("__EVENTVALIDATION"), | ||
| } | ||
|
|
||
|
|
||
| def download_and_extract(date: datetime, raw_dir: Path, tmp_dir: Path) -> bool: | ||
| date_str = date.strftime("%d/%m/%Y") | ||
| period_str = f"{date_str} - {date_str}" | ||
|
|
||
| state = get_form_state() | ||
|
|
||
| payload = { | ||
| "ctl00$ContentPlaceHolder1$ddlReporte": "362,325", | ||
| "ctl00$ContentPlaceHolder1$ddlPeriodicidad": "D", | ||
| "ctl00$ContentPlaceHolder1$ddlSistema": "SIN", | ||
| "ctl00$ContentPlaceHolder1$txtPeriodo": period_str, | ||
| "ctl00$ContentPlaceHolder1$hdfStartDateSelected": date_str, | ||
| "ctl00$ContentPlaceHolder1$hdfEndDateSelected": date_str, | ||
| "ctl00$ContentPlaceHolder1$btnDescargarZIP": "Descargar ZIP", | ||
| "__VIEWSTATE": state["__VIEWSTATE"], | ||
| "__VIEWSTATEGENERATOR": state["__VIEWSTATEGENERATOR"], | ||
| "__VIEWSTATEENCRYPTED": state["__VIEWSTATEENCRYPTED"], | ||
| "__EVENTVALIDATION": state["__EVENTVALIDATION"], | ||
| "__EVENTTARGET": "", | ||
| "__EVENTARGUMENT": "", | ||
| } | ||
|
|
||
| r = session.post(URL, data=payload, headers=HEADERS) | ||
| r.raise_for_status() | ||
|
|
||
| size = len(r.content) | ||
| print(f"{date_str} | {size} bytes") | ||
|
|
||
| if size < 10000: | ||
| print(f"Skipping {date_str}: file not published or response too small") | ||
| return False | ||
|
|
||
| raw_dir.mkdir(parents=True, exist_ok=True) | ||
| tmp_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| zip_path = raw_zip_path(date, raw_dir) | ||
|
|
||
| with open(zip_path, "wb") as f: | ||
| f.write(r.content) | ||
|
|
||
| with zipfile.ZipFile(zip_path, "r") as z: | ||
| z.extractall(tmp_dir) | ||
|
|
||
| return True | ||
|
|
||
|
|
||
| def backfill_missing(start_date: datetime, end_date: datetime, base_dir: Path) -> None: | ||
| raw_dir = base_dir / "raw" | ||
| tmp_dir = base_dir / "tmp" | ||
|
|
||
| current = start_date | ||
| while current <= end_date: | ||
| zip_path = raw_zip_path(current, raw_dir) | ||
| if zip_path.exists(): | ||
| print(f"Already have {current.strftime('%Y-%m-%d')}, skipping") | ||
| else: | ||
| try: | ||
| ok = download_and_extract(current, raw_dir, tmp_dir) | ||
| if not ok: | ||
| print(f"Stopping at {current.strftime('%Y-%m-%d')}") | ||
| break | ||
| except Exception as e: | ||
| print(f"Error on {current.strftime('%Y-%m-%d')}: {e}") | ||
| break | ||
| current += timedelta(days=1) | ||
|
|
||
|
|
||
| def run_execution_date(execution_date: datetime, base_dir: Path) -> bool: | ||
| raw_dir = base_dir / "raw" | ||
| tmp_dir = base_dir / "tmp" | ||
| target_date = target_date_for_execution(execution_date) | ||
|
|
||
| zip_path = raw_zip_path(target_date, raw_dir) | ||
| if zip_path.exists(): | ||
| print(f"Already have {target_date.strftime('%Y-%m-%d')}, skipping") | ||
| return True | ||
|
|
||
| return download_and_extract(target_date, raw_dir, tmp_dir) | ||
|
|
||
|
|
||
| def parse_args() -> argparse.Namespace: | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("--execution-date", default=None) | ||
| parser.add_argument("--start-date", default=None) | ||
| parser.add_argument("--end-date", default=None) | ||
| parser.add_argument("--out", default=str(DEFAULT_BASE_DIR)) | ||
| return parser.parse_args() | ||
|
|
||
|
|
||
| def main() -> None: | ||
| args = parse_args() | ||
| base_dir = Path(args.out).resolve() | ||
|
|
||
| if args.start_date: | ||
| start_date = datetime.strptime(args.start_date, "%Y-%m-%d") | ||
| if args.end_date: | ||
| end_date = datetime.strptime(args.end_date, "%Y-%m-%d") | ||
| elif args.execution_date: | ||
| end_date = target_date_for_execution( | ||
| datetime.strptime(args.execution_date, "%Y-%m-%d") | ||
| ) | ||
| else: | ||
| end_date = datetime.today() | ||
| backfill_missing(start_date=start_date, end_date=end_date, base_dir=base_dir) | ||
| return | ||
|
|
||
| if args.execution_date: | ||
| execution_date = datetime.strptime(args.execution_date, "%Y-%m-%d") | ||
| ok = run_execution_date(execution_date=execution_date, base_dir=base_dir) | ||
| if not ok: | ||
| print("No new CENACE publication detected; stopping cleanly") | ||
| return | ||
|
|
||
| raise ValueError("Provide either --start-date or --execution-date") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass | ||
| from pathlib import Path | ||
|
|
||
| import duckdb | ||
| import pandas as pd | ||
|
|
||
|
|
||
| @dataclass | ||
| class CENACEData: | ||
| base_path: Path | ||
| freq: str = "hourly" | ||
| h: int = 24 | ||
| max_window_size: int = 24 * 90 | ||
|
|
||
| def __post_init__(self) -> None: | ||
| self.base_path = Path(self.base_path) | ||
|
|
||
| def _date_to_partition(self, d: pd.Timestamp) -> Path: | ||
| return ( | ||
| self.base_path | ||
| / f"year={d.year:04d}" | ||
| / f"month={d.month:02d}" | ||
| / f"day={d.day:02d}" | ||
| / "series.parquet" | ||
| ) | ||
|
|
||
| def _paths_for_range(self, start: pd.Timestamp, end: pd.Timestamp) -> list[str]: | ||
| days = pd.date_range(start.normalize(), end.normalize(), freq="D") | ||
| paths = [self._date_to_partition(d) for d in days] | ||
| existing = [str(p) for p in paths if p.exists()] | ||
| if not existing: | ||
| raise FileNotFoundError( | ||
| f"No parquet files found between {start} and \ | ||
| {end} under {self.base_path}" | ||
| ) | ||
| return existing | ||
|
|
||
| def get_df( | ||
| self, | ||
| cutoff: str | pd.Timestamp, | ||
| max_window_size: int | None = None, | ||
| sort: bool = True, | ||
| ) -> pd.DataFrame: | ||
| cutoff = pd.Timestamp(cutoff) | ||
| window = max_window_size or self.max_window_size | ||
| start = cutoff - pd.Timedelta(hours=window - 1) | ||
|
|
||
| paths = self._paths_for_range(start, cutoff) | ||
|
|
||
| query = f""" | ||
| SELECT unique_id, ds, y | ||
| FROM read_parquet({paths}) | ||
| WHERE ds >= TIMESTAMP '{start}' | ||
| AND ds <= TIMESTAMP '{cutoff}' | ||
| """ | ||
|
|
||
| df = duckdb.sql(query).df() | ||
| df["ds"] = pd.to_datetime(df["ds"]) | ||
|
Comment on lines
+50
to
+60
|
||
|
|
||
| if sort: | ||
| df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) | ||
|
|
||
| return df | ||
|
|
||
| def get_actuals( | ||
| self, cutoff: str | pd.Timestamp, h: int | None = None | ||
| ) -> pd.DataFrame: | ||
| cutoff = pd.Timestamp(cutoff) | ||
| horizon = h or self.h | ||
|
|
||
| start = cutoff + pd.Timedelta(hours=1) | ||
| end = cutoff + pd.Timedelta(hours=horizon) | ||
|
|
||
| paths = self._paths_for_range(start, end) | ||
|
|
||
| query = f""" | ||
| SELECT unique_id, ds, y | ||
| FROM read_parquet({paths}) | ||
| WHERE ds >= TIMESTAMP '{start}' | ||
| AND ds <= TIMESTAMP '{end}' | ||
| """ | ||
|
|
||
| df = duckdb.sql(query).df() | ||
| df["ds"] = pd.to_datetime(df["ds"]) | ||
| df = df.sort_values(["unique_id", "ds"]).reset_index(drop=True) | ||
| return df | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
FileNotFoundErrormessage uses a line-continuation backslash inside the f-string, which will embed a newline and indentation spaces into the exception text. Format this as a single-line f-string (or usetextwrap.dedent) so the error message is stable and readable.