From 9a6f8b98c3dd74603a5f26ac440cd7437c447aa9 Mon Sep 17 00:00:00 2001 From: Esteban Zimanyi Date: Tue, 19 May 2026 12:36:44 +0200 Subject: [PATCH] Add data-lake Parquet/Arrow consumer (pymeos.io) Mirrors the MobilityDuck data-lake recipe so files interoperate: a MEOS-WKB payload column, O(1) native-scalar bounding-box sidecar columns for Parquet/Arrow row-group pruning, and a temporal footer byte-identical to MobilityDuck's temporalFooter(). Adds to_arrow / from_arrow / write_temporal / read_temporal, pyarrow as the optional [parquet] extra, a quickstart example and tests. --- examples/datalake_quickstart.py | 62 ++++++ pymeos/io/__init__.py | 35 ++++ pymeos/io/parquet.py | 332 ++++++++++++++++++++++++++++++++ pyproject.toml | 4 + tests/io/test_parquet.py | 97 ++++++++++ 5 files changed, 530 insertions(+) create mode 100644 examples/datalake_quickstart.py create mode 100644 pymeos/io/__init__.py create mode 100644 pymeos/io/parquet.py create mode 100644 tests/io/test_parquet.py diff --git a/examples/datalake_quickstart.py b/examples/datalake_quickstart.py new file mode 100644 index 00000000..fc3ed412 --- /dev/null +++ b/examples/datalake_quickstart.py @@ -0,0 +1,62 @@ +""" +Edge-to-cloud data-lake quickstart for PyMEOS. + +Python port of the MobilityDuck ``examples/quickstart/quickstart.sql`` recipe. +Files written here are interoperable with MobilityDuck: same MEOS-WKB payload, +same native-scalar sidecar columns, same ``temporal`` footer. + +Run with the ``parquet`` extra installed:: + + pip install "pymeos[parquet]" + python examples/datalake_quickstart.py +""" + +from datetime import datetime, timedelta, timezone + +from pymeos import pymeos_initialize, pymeos_finalize, TGeogPointSeq +from pymeos.io.parquet import write_temporal, read_temporal + +pymeos_initialize("UTC") +try: + # 1. Raw pings -> one trajectory per moving entity. + base = datetime(2026, 1, 15, tzinfo=timezone.utc) + fleet = [ + (1, 10.00, 55.50, 0.23, 0.05), + (2, 14.00, 56.00, -0.18, -0.08), + (3, 8.50, 57.50, 0.06, -0.06), + (4, 12.10, 55.20, 0.04, 0.02), + (5, 9.50, 54.50, 0.22, 0.06), + ] + + def trajectory(lon, lat, dlon, dlat): + pings = ", ".join( + f"Point({lon + dlon * s:.6f} {lat + dlat * s:.6f})@" + f"{(base + timedelta(minutes=10 * s)).isoformat()}" + for s in range(12) + ) + return TGeogPointSeq(f"[{pings}]") + + rows = { + "entity_id": [e[0] for e in fleet], + "traj": [trajectory(*e[1:]) for e in fleet], + } + print(f"{len(rows['entity_id'])} trajectories") + + # 2. Write to Parquet: MEOS-WKB payload + native-scalar sidecars + # (traj_ts_min/max, traj_x/y_min/max) + the `temporal` footer. + path = "edge_to_cloud_demo.parquet" + write_temporal(rows, path, row_group_size=2) + + # 3. Cloud side: prune on a native sidecar column *without* decoding any + # trajectory, then reconstruct only the survivors from WKB. + window_start = base + timedelta(minutes=20) + selected = read_temporal(path, filters=[("traj_ts_max", ">=", window_start)]) + print(f"{len(selected['entity_id'])} trajectories overlap the window") + + for eid, traj in zip(selected["entity_id"], selected["traj"]): + print( + f" entity {eid}: length={traj.length():.1f} m, " + f"type={type(traj).__name__}" + ) +finally: + pymeos_finalize() diff --git a/pymeos/io/__init__.py b/pymeos/io/__init__.py new file mode 100644 index 00000000..3c9604fd --- /dev/null +++ b/pymeos/io/__init__.py @@ -0,0 +1,35 @@ +""" +Data-lake interchange for PyMEOS temporal types. + +Mirrors the MobilityDuck data-lake consumer recipe so files written by either +tool interoperate: an opaque MEOS-WKB payload column, O(1) native-scalar +bounding-box *sidecar* columns that let Parquet/Arrow engines prune row groups +without decoding the temporal payload, and a ``temporal`` footer declaring the +column encodings. + +The public surface lives in :mod:`pymeos.io.parquet`. Importing this package +does not require ``pyarrow``; only calling the read/write helpers does +(``pip install pymeos[parquet]``). +""" + +from .parquet import ( + temporal_footer, + to_arrow, + from_arrow, + write_temporal, + read_temporal, + FOOTER_VERSION, + WKB_ENCODING, + WKB_ENCODING_VERSION, +) + +__all__ = [ + "temporal_footer", + "to_arrow", + "from_arrow", + "write_temporal", + "read_temporal", + "FOOTER_VERSION", + "WKB_ENCODING", + "WKB_ENCODING_VERSION", +] diff --git a/pymeos/io/parquet.py b/pymeos/io/parquet.py new file mode 100644 index 00000000..fd6d6c2d --- /dev/null +++ b/pymeos/io/parquet.py @@ -0,0 +1,332 @@ +""" +Parquet / Arrow data-lake interchange for PyMEOS temporal types. + +Recipe (identical to the MobilityDuck data-lake consumer, so files written by +either tool interoperate): + +* the temporal column is stored as an opaque **MEOS-WKB** payload + (``Temporal.as_wkb()``), +* O(1) **native-scalar sidecar** columns are derived from each value's inline + bounding box (``ts_min``/``ts_max`` and, per type, ``x*/y*/z*`` spatial or + ``v*`` numeric bounds) so a Parquet/Arrow engine prunes row groups from + column statistics *without decoding the payload*, +* a ``temporal`` footer (Parquet ``KV_METADATA`` / Arrow schema metadata) + declares each temporal column's encoding and base type. + +``pyarrow`` is required only to call these helpers (``pip install +pymeos[parquet]``). +""" + +from __future__ import annotations + +import json +from typing import Any, Mapping, Optional, Sequence, Union + +FOOTER_KEY = "temporal" +FOOTER_VERSION = "1.0.0" +WKB_ENCODING = "MEOS-WKB" +WKB_ENCODING_VERSION = "1.0" + +__all__ = [ + "temporal_footer", + "to_arrow", + "from_arrow", + "write_temporal", + "read_temporal", + "FOOTER_VERSION", + "WKB_ENCODING", + "WKB_ENCODING_VERSION", +] + + +def _pa(): + try: + import pyarrow as pa # noqa: F401 + + return pa + except ModuleNotFoundError as e: # pragma: no cover - exercised via extra + raise ModuleNotFoundError( + "pyarrow is required for pymeos.io; install it with " + "`pip install pymeos[parquet]`" + ) from e + + +def _pq(): + _pa() + import pyarrow.parquet as pq + + return pq + + +# --------------------------------------------------------------------------- # +# Footer (interop contract, byte-identical to MobilityDuck temporalFooter()) +# --------------------------------------------------------------------------- # +def temporal_footer(columns: Mapping[str, str]) -> str: + """ + Build the ``temporal`` footer JSON declaring the WKB-encoded columns. + + Byte-identical to MobilityDuck's ``temporalFooter()`` so a Parquet file is + portable across both tools:: + + {"version":"1.0.0","columns":{"":{"encoding":"MEOS-WKB", + "encoding_version":"1.0","base_type":""}}} + + Args: + columns: mapping of column name to MEOS base type (e.g. + ``{"trip": "tgeompoint"}``). + + Returns: + The footer as a compact JSON string. + """ + payload = { + "version": FOOTER_VERSION, + "columns": { + name: { + "encoding": WKB_ENCODING, + "encoding_version": WKB_ENCODING_VERSION, + "base_type": base_type, + } + for name, base_type in columns.items() + }, + } + return json.dumps(payload, separators=(",", ":")) + + +def _parse_footer(blob: Optional[bytes]) -> dict: + if not blob: + return {} + footer = json.loads(blob.decode("utf-8") if isinstance(blob, bytes) else blob) + return footer.get("columns", {}) + + +# --------------------------------------------------------------------------- # +# Type/bbox introspection +# --------------------------------------------------------------------------- # +def _base_type_name(obj: Any) -> str: + from ..main import TGeomPoint, TGeogPoint, TInt, TFloat, TBool, TText + + if isinstance(obj, TGeomPoint): + return "tgeompoint" + if isinstance(obj, TGeogPoint): + return "tgeogpoint" + if isinstance(obj, TInt): + return "tint" + if isinstance(obj, TFloat): + return "tfloat" + if isinstance(obj, TBool): + return "tbool" + if isinstance(obj, TText): + return "ttext" + raise TypeError(f"unsupported temporal type: {type(obj).__name__}") + + +def _sidecar_values(obj: Any) -> "dict[str, Any]": + """Native-scalar bounds extracted O(1) from the value's inline bbox.""" + from ..main import TPoint, TNumber + + if isinstance(obj, TPoint): + box = obj.bounding_box() + out = { + "ts_min": box.tmin(), + "ts_max": box.tmax(), + "x_min": box.xmin(), + "x_max": box.xmax(), + "y_min": box.ymin(), + "y_max": box.ymax(), + } + if box.zmin() is not None: + out["z_min"] = box.zmin() + out["z_max"] = box.zmax() + return out + if isinstance(obj, TNumber): + box = obj.bounding_box() + return { + "ts_min": box.tmin(), + "ts_max": box.tmax(), + "v_min": box.xmin(), + "v_max": box.xmax(), + } + span = obj.bounding_box() # TsTzSpan for tbool / ttext + return {"ts_min": span.lower(), "ts_max": span.upper()} + + +_TS_KEYS = {"ts_min", "ts_max"} + + +def _sidecar_arrow_type(suffix: str): + pa = _pa() + return pa.timestamp("us", tz="UTC") if suffix in _TS_KEYS else pa.float64() + + +# --------------------------------------------------------------------------- # +# Columnar normalisation +# --------------------------------------------------------------------------- # +def _as_columns(data: Any) -> "dict[str, list]": + # pandas DataFrame + if hasattr(data, "to_dict") and hasattr(data, "columns"): + return {c: list(data[c]) for c in data.columns} + # Mapping of column -> sequence + if isinstance(data, Mapping): + return {k: list(v) for k, v in data.items()} + # Sequence of row mappings + rows = list(data) + if rows and isinstance(rows[0], Mapping): + keys = list(rows[0].keys()) + return {k: [r.get(k) for r in rows] for k in keys} + raise TypeError( + "data must be a pandas DataFrame, a {column: sequence} mapping, " + "or a sequence of row dicts" + ) + + +def _is_temporal(value: Any) -> bool: + from ..temporal import Temporal + + return isinstance(value, Temporal) + + +def _detect_temporal_columns( + cols: "dict[str, list]", explicit: Optional[Sequence[str]] +) -> "list[str]": + if explicit is not None: + return list(explicit) + detected = [] + for name, values in cols.items(): + if any(v is not None and _is_temporal(v) for v in values): + detected.append(name) + return detected + + +# --------------------------------------------------------------------------- # +# Arrow-native API +# --------------------------------------------------------------------------- # +def to_arrow( + data: Any, + *, + temporal_columns: Optional[Sequence[str]] = None, + sidecars: bool = True, +): + """ + Build a ``pyarrow.Table`` with WKB payload columns, native-scalar sidecars + and the ``temporal`` footer in the schema metadata. + + Args: + data: a pandas ``DataFrame``, a ``{column: sequence}`` mapping, or a + sequence of row dicts. + temporal_columns: columns to encode; auto-detected (any column holding + ``Temporal`` values) when omitted. + sidecars: also emit ``_ts_min``/``ts_max`` and the per-type + spatial (``x/y/z``) or numeric (``v``) bound columns. + + Returns: + ``pyarrow.Table``. + """ + pa = _pa() + cols = _as_columns(data) + temporal_cols = _detect_temporal_columns(cols, temporal_columns) + + arrays: "dict[str, Any]" = {} + footer_cols: "dict[str, str]" = {} + + for name, values in cols.items(): + if name not in temporal_cols: + arrays[name] = pa.array(values) + continue + + base_type = None + wkb_col: "list[Optional[bytes]]" = [] + per_row: "list[Optional[dict]]" = [] # row-aligned sidecar dicts + for v in values: + if v is None: + wkb_col.append(None) + per_row.append(None) + continue + if base_type is None: + base_type = _base_type_name(v) + wkb_col.append(v.as_wkb()) + per_row.append(_sidecar_values(v) if sidecars else None) + if base_type is None: + raise ValueError(f"temporal column {name!r} is entirely null") + footer_cols[name] = base_type + arrays[name] = pa.array(wkb_col, type=pa.binary()) + if sidecars: + suffixes: "list[str]" = [] + for d in per_row: + for k in d or (): + if k not in suffixes: + suffixes.append(k) + for suffix in suffixes: + col = [None if d is None else d.get(suffix) for d in per_row] + arrays[f"{name}_{suffix}"] = pa.array( + col, type=_sidecar_arrow_type(suffix) + ) + + table = pa.table(arrays) + md = dict(table.schema.metadata or {}) + md[FOOTER_KEY.encode()] = temporal_footer(footer_cols).encode("utf-8") + return table.replace_schema_metadata(md) + + +def from_arrow(table, *, reconstruct: bool = True): + """ + Reverse of :func:`to_arrow`. Reads the ``temporal`` footer and, when + ``reconstruct`` is set, rebuilds PyMEOS objects from the WKB columns + (the type is recovered from the WKB itself, the footer is advisory). + + Returns a ``dict`` of ``{column: list}`` (sidecar columns preserved). + """ + _pa() + from ..temporal import Temporal + + footer = _parse_footer((table.schema.metadata or {}).get(FOOTER_KEY.encode())) + out: "dict[str, list]" = {} + for name in table.column_names: + values = table.column(name).to_pylist() + if reconstruct and name in footer: + out[name] = [None if v is None else Temporal.from_wkb(v) for v in values] + else: + out[name] = values + return out + + +# --------------------------------------------------------------------------- # +# Parquet file API +# --------------------------------------------------------------------------- # +def write_temporal( + data: Any, + path: str, + *, + temporal_columns: Optional[Sequence[str]] = None, + sidecars: bool = True, + row_group_size: Optional[int] = None, + **write_kwargs: Any, +) -> None: + """ + Write ``data`` to a Parquet file using the data-lake recipe. The + ``temporal`` footer lands in the Parquet ``KV_METADATA`` and the native + sidecar columns carry the row-group statistics used for pushdown. + """ + pq = _pq() + table = to_arrow(data, temporal_columns=temporal_columns, sidecars=sidecars) + pq.write_table(table, path, row_group_size=row_group_size, **write_kwargs) + + +def read_temporal( + path: str, + *, + columns: Optional[Sequence[str]] = None, + filters: Optional[Any] = None, + reconstruct: bool = True, +): + """ + Read a Parquet file written by :func:`write_temporal` (or MobilityDuck). + + ``filters`` is passed straight to ``pyarrow.parquet``; predicates on the + native sidecar columns (e.g. ``[("trip_ts_min", "<=", end)]``) prune row + groups before any WKB is decoded. + """ + pq = _pq() + table = pq.read_table( + path, columns=list(columns) if columns else None, filters=filters + ) + return from_arrow(table, reconstruct=reconstruct) diff --git a/pyproject.toml b/pyproject.toml index d0b8aaa2..9f0fcd14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,10 @@ pandas = [ 'geopandas' ] +parquet = [ + 'pyarrow' +] + [project.urls] 'Homepage' = 'https://github.com/MobilityDB/PyMEOS/pymeos' 'Bug Tracker' = 'https://github.com/MobilityDB/PyMEOS/issues' diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py new file mode 100644 index 00000000..fa5f8583 --- /dev/null +++ b/tests/io/test_parquet.py @@ -0,0 +1,97 @@ +from datetime import datetime, timedelta, timezone + +import pytest + +pytest.importorskip("pyarrow") # the optional `parquet` extra + +import pyarrow as pa # noqa: E402 +import pyarrow.parquet as pq # noqa: E402 + +from pymeos import TGeogPointSeq, TFloatSeq # noqa: E402 +from pymeos.io.parquet import ( # noqa: E402 + temporal_footer, + to_arrow, + from_arrow, + write_temporal, + read_temporal, +) + +# Byte-for-byte output of MobilityDuck's temporalFooter() (interop contract). +_MOBILITYDUCK_FOOTER = ( + '{"version":"1.0.0","columns":{"traj":{"encoding":"MEOS-WKB",' + '"encoding_version":"1.0","base_type":"tgeogpoint"}}}' +) + + +def _base(): + return datetime(2026, 1, 15, tzinfo=timezone.utc) + + +def _traj(lon, lat, dlon, dlat): + pts = ", ".join( + f"Point({lon + dlon * s:.6f} {lat + dlat * s:.6f})@" + f"{(_base() + timedelta(minutes=10 * s)).isoformat()}" + for s in range(12) + ) + return TGeogPointSeq(f"[{pts}]") + + +def test_footer_is_byte_identical_to_mobilityduck(): + assert temporal_footer({"traj": "tgeogpoint"}) == _MOBILITYDUCK_FOOTER + + +def test_to_arrow_emits_wkb_payload_sidecars_and_footer(): + rows = { + "entity_id": [1, 2], + "traj": [_traj(10, 55, 0.2, 0.05), _traj(14, 56, -0.1, -0.08)], + } + tbl = to_arrow(rows) + + assert b"temporal" in (tbl.schema.metadata or {}) + assert pa.types.is_binary(tbl.schema.field("traj").type) + for c in ( + "traj_ts_min", + "traj_ts_max", + "traj_x_min", + "traj_x_max", + "traj_y_min", + "traj_y_max", + ): + assert c in tbl.column_names + assert pa.types.is_timestamp(tbl.schema.field("traj_ts_min").type) + assert pa.types.is_floating(tbl.schema.field("traj_x_min").type) + + +def test_parquet_roundtrip_footer_and_sidecar_pushdown(tmp_path): + rows = { + "entity_id": [1, 2, 3], + "traj": [ + _traj(10, 55, 0.2, 0.05), + _traj(8.5, 57.5, 0.06, -0.06), + _traj(9.5, 54.5, 0.22, 0.06), + ], + } + path = str(tmp_path / "demo.parquet") + write_temporal(rows, path, row_group_size=1) + + assert pq.read_metadata(path).metadata[b"temporal"] == _MOBILITYDUCK_FOOTER.encode() + + # every trajectory starts at _base(); a >= cutoff filter prunes everything + cutoff = _base() + timedelta(minutes=30) + pruned = read_temporal(path, filters=[("traj_ts_min", ">=", cutoff)]) + assert len(pruned["entity_id"]) == 0 + + full = read_temporal(path) + assert len(full["entity_id"]) == 3 + assert type(full["traj"][0]).__name__.startswith("TGeogPoint") + assert full["traj"][0] == rows["traj"][0] # WKB round-trip is lossless + + +def test_tfloat_numeric_sidecars_roundtrip(): + data = {"id": [1], "v": [TFloatSeq("[1@2026-01-01, 9@2026-01-02]")]} + tbl = to_arrow(data) + assert {"v_v_min", "v_v_max", "v_ts_min", "v_ts_max"}.issubset( + set(tbl.column_names) + ) + back = from_arrow(tbl) + assert back["v"][0] == data["v"][0]