diff --git a/examples/datalake_quickstart.py b/examples/datalake_quickstart.py
new file mode 100644
index 00000000..fc3ed412
--- /dev/null
+++ b/examples/datalake_quickstart.py
@@ -0,0 +1,62 @@
+"""
+Edge-to-cloud data-lake quickstart for PyMEOS.
+
+Python port of the MobilityDuck ``examples/quickstart/quickstart.sql`` recipe.
+Files written here are interoperable with MobilityDuck: same MEOS-WKB payload,
+same native-scalar sidecar columns, same ``temporal`` footer.
+
+Run with the ``parquet`` extra installed::
+
+ pip install "pymeos[parquet]"
+ python examples/datalake_quickstart.py
+"""
+
+from datetime import datetime, timedelta, timezone
+
+from pymeos import pymeos_initialize, pymeos_finalize, TGeogPointSeq
+from pymeos.io.parquet import write_temporal, read_temporal
+
+pymeos_initialize("UTC")
+try:
+ # 1. Raw pings -> one trajectory per moving entity.
+ base = datetime(2026, 1, 15, tzinfo=timezone.utc)
+ fleet = [
+ (1, 10.00, 55.50, 0.23, 0.05),
+ (2, 14.00, 56.00, -0.18, -0.08),
+ (3, 8.50, 57.50, 0.06, -0.06),
+ (4, 12.10, 55.20, 0.04, 0.02),
+ (5, 9.50, 54.50, 0.22, 0.06),
+ ]
+
+ def trajectory(lon, lat, dlon, dlat):
+ pings = ", ".join(
+ f"Point({lon + dlon * s:.6f} {lat + dlat * s:.6f})@"
+ f"{(base + timedelta(minutes=10 * s)).isoformat()}"
+ for s in range(12)
+ )
+ return TGeogPointSeq(f"[{pings}]")
+
+ rows = {
+ "entity_id": [e[0] for e in fleet],
+ "traj": [trajectory(*e[1:]) for e in fleet],
+ }
+ print(f"{len(rows['entity_id'])} trajectories")
+
+ # 2. Write to Parquet: MEOS-WKB payload + native-scalar sidecars
+ # (traj_ts_min/max, traj_x/y_min/max) + the `temporal` footer.
+ path = "edge_to_cloud_demo.parquet"
+ write_temporal(rows, path, row_group_size=2)
+
+ # 3. Cloud side: prune on a native sidecar column *without* decoding any
+ # trajectory, then reconstruct only the survivors from WKB.
+ window_start = base + timedelta(minutes=20)
+ selected = read_temporal(path, filters=[("traj_ts_max", ">=", window_start)])
+ print(f"{len(selected['entity_id'])} trajectories overlap the window")
+
+ for eid, traj in zip(selected["entity_id"], selected["traj"]):
+ print(
+ f" entity {eid}: length={traj.length():.1f} m, "
+ f"type={type(traj).__name__}"
+ )
+finally:
+ pymeos_finalize()
diff --git a/pymeos/io/__init__.py b/pymeos/io/__init__.py
new file mode 100644
index 00000000..3c9604fd
--- /dev/null
+++ b/pymeos/io/__init__.py
@@ -0,0 +1,35 @@
+"""
+Data-lake interchange for PyMEOS temporal types.
+
+Mirrors the MobilityDuck data-lake consumer recipe so files written by either
+tool interoperate: an opaque MEOS-WKB payload column, O(1) native-scalar
+bounding-box *sidecar* columns that let Parquet/Arrow engines prune row groups
+without decoding the temporal payload, and a ``temporal`` footer declaring the
+column encodings.
+
+The public surface lives in :mod:`pymeos.io.parquet`. Importing this package
+does not require ``pyarrow``; only calling the read/write helpers does
+(``pip install pymeos[parquet]``).
+"""
+
+from .parquet import (
+ temporal_footer,
+ to_arrow,
+ from_arrow,
+ write_temporal,
+ read_temporal,
+ FOOTER_VERSION,
+ WKB_ENCODING,
+ WKB_ENCODING_VERSION,
+)
+
+__all__ = [
+ "temporal_footer",
+ "to_arrow",
+ "from_arrow",
+ "write_temporal",
+ "read_temporal",
+ "FOOTER_VERSION",
+ "WKB_ENCODING",
+ "WKB_ENCODING_VERSION",
+]
diff --git a/pymeos/io/parquet.py b/pymeos/io/parquet.py
new file mode 100644
index 00000000..fd6d6c2d
--- /dev/null
+++ b/pymeos/io/parquet.py
@@ -0,0 +1,332 @@
+"""
+Parquet / Arrow data-lake interchange for PyMEOS temporal types.
+
+Recipe (identical to the MobilityDuck data-lake consumer, so files written by
+either tool interoperate):
+
+* the temporal column is stored as an opaque **MEOS-WKB** payload
+ (``Temporal.as_wkb()``),
+* O(1) **native-scalar sidecar** columns are derived from each value's inline
+ bounding box (``ts_min``/``ts_max`` and, per type, ``x*/y*/z*`` spatial or
+ ``v*`` numeric bounds) so a Parquet/Arrow engine prunes row groups from
+ column statistics *without decoding the payload*,
+* a ``temporal`` footer (Parquet ``KV_METADATA`` / Arrow schema metadata)
+ declares each temporal column's encoding and base type.
+
+``pyarrow`` is required only to call these helpers (``pip install
+pymeos[parquet]``).
+"""
+
+from __future__ import annotations
+
+import json
+from typing import Any, Mapping, Optional, Sequence, Union
+
+FOOTER_KEY = "temporal"
+FOOTER_VERSION = "1.0.0"
+WKB_ENCODING = "MEOS-WKB"
+WKB_ENCODING_VERSION = "1.0"
+
+__all__ = [
+ "temporal_footer",
+ "to_arrow",
+ "from_arrow",
+ "write_temporal",
+ "read_temporal",
+ "FOOTER_VERSION",
+ "WKB_ENCODING",
+ "WKB_ENCODING_VERSION",
+]
+
+
+def _pa():
+ try:
+ import pyarrow as pa # noqa: F401
+
+ return pa
+ except ModuleNotFoundError as e: # pragma: no cover - exercised via extra
+ raise ModuleNotFoundError(
+ "pyarrow is required for pymeos.io; install it with "
+ "`pip install pymeos[parquet]`"
+ ) from e
+
+
+def _pq():
+ _pa()
+ import pyarrow.parquet as pq
+
+ return pq
+
+
+# --------------------------------------------------------------------------- #
+# Footer (interop contract, byte-identical to MobilityDuck temporalFooter())
+# --------------------------------------------------------------------------- #
+def temporal_footer(columns: Mapping[str, str]) -> str:
+ """
+ Build the ``temporal`` footer JSON declaring the WKB-encoded columns.
+
+ Byte-identical to MobilityDuck's ``temporalFooter()`` so a Parquet file is
+ portable across both tools::
+
+ {"version":"1.0.0","columns":{"
":{"encoding":"MEOS-WKB",
+ "encoding_version":"1.0","base_type":""}}}
+
+ Args:
+ columns: mapping of column name to MEOS base type (e.g.
+ ``{"trip": "tgeompoint"}``).
+
+ Returns:
+ The footer as a compact JSON string.
+ """
+ payload = {
+ "version": FOOTER_VERSION,
+ "columns": {
+ name: {
+ "encoding": WKB_ENCODING,
+ "encoding_version": WKB_ENCODING_VERSION,
+ "base_type": base_type,
+ }
+ for name, base_type in columns.items()
+ },
+ }
+ return json.dumps(payload, separators=(",", ":"))
+
+
+def _parse_footer(blob: Optional[bytes]) -> dict:
+ if not blob:
+ return {}
+ footer = json.loads(blob.decode("utf-8") if isinstance(blob, bytes) else blob)
+ return footer.get("columns", {})
+
+
+# --------------------------------------------------------------------------- #
+# Type/bbox introspection
+# --------------------------------------------------------------------------- #
+def _base_type_name(obj: Any) -> str:
+ from ..main import TGeomPoint, TGeogPoint, TInt, TFloat, TBool, TText
+
+ if isinstance(obj, TGeomPoint):
+ return "tgeompoint"
+ if isinstance(obj, TGeogPoint):
+ return "tgeogpoint"
+ if isinstance(obj, TInt):
+ return "tint"
+ if isinstance(obj, TFloat):
+ return "tfloat"
+ if isinstance(obj, TBool):
+ return "tbool"
+ if isinstance(obj, TText):
+ return "ttext"
+ raise TypeError(f"unsupported temporal type: {type(obj).__name__}")
+
+
+def _sidecar_values(obj: Any) -> "dict[str, Any]":
+ """Native-scalar bounds extracted O(1) from the value's inline bbox."""
+ from ..main import TPoint, TNumber
+
+ if isinstance(obj, TPoint):
+ box = obj.bounding_box()
+ out = {
+ "ts_min": box.tmin(),
+ "ts_max": box.tmax(),
+ "x_min": box.xmin(),
+ "x_max": box.xmax(),
+ "y_min": box.ymin(),
+ "y_max": box.ymax(),
+ }
+ if box.zmin() is not None:
+ out["z_min"] = box.zmin()
+ out["z_max"] = box.zmax()
+ return out
+ if isinstance(obj, TNumber):
+ box = obj.bounding_box()
+ return {
+ "ts_min": box.tmin(),
+ "ts_max": box.tmax(),
+ "v_min": box.xmin(),
+ "v_max": box.xmax(),
+ }
+ span = obj.bounding_box() # TsTzSpan for tbool / ttext
+ return {"ts_min": span.lower(), "ts_max": span.upper()}
+
+
+_TS_KEYS = {"ts_min", "ts_max"}
+
+
+def _sidecar_arrow_type(suffix: str):
+ pa = _pa()
+ return pa.timestamp("us", tz="UTC") if suffix in _TS_KEYS else pa.float64()
+
+
+# --------------------------------------------------------------------------- #
+# Columnar normalisation
+# --------------------------------------------------------------------------- #
+def _as_columns(data: Any) -> "dict[str, list]":
+ # pandas DataFrame
+ if hasattr(data, "to_dict") and hasattr(data, "columns"):
+ return {c: list(data[c]) for c in data.columns}
+ # Mapping of column -> sequence
+ if isinstance(data, Mapping):
+ return {k: list(v) for k, v in data.items()}
+ # Sequence of row mappings
+ rows = list(data)
+ if rows and isinstance(rows[0], Mapping):
+ keys = list(rows[0].keys())
+ return {k: [r.get(k) for r in rows] for k in keys}
+ raise TypeError(
+ "data must be a pandas DataFrame, a {column: sequence} mapping, "
+ "or a sequence of row dicts"
+ )
+
+
+def _is_temporal(value: Any) -> bool:
+ from ..temporal import Temporal
+
+ return isinstance(value, Temporal)
+
+
+def _detect_temporal_columns(
+ cols: "dict[str, list]", explicit: Optional[Sequence[str]]
+) -> "list[str]":
+ if explicit is not None:
+ return list(explicit)
+ detected = []
+ for name, values in cols.items():
+ if any(v is not None and _is_temporal(v) for v in values):
+ detected.append(name)
+ return detected
+
+
+# --------------------------------------------------------------------------- #
+# Arrow-native API
+# --------------------------------------------------------------------------- #
+def to_arrow(
+ data: Any,
+ *,
+ temporal_columns: Optional[Sequence[str]] = None,
+ sidecars: bool = True,
+):
+ """
+ Build a ``pyarrow.Table`` with WKB payload columns, native-scalar sidecars
+ and the ``temporal`` footer in the schema metadata.
+
+ Args:
+ data: a pandas ``DataFrame``, a ``{column: sequence}`` mapping, or a
+ sequence of row dicts.
+ temporal_columns: columns to encode; auto-detected (any column holding
+ ``Temporal`` values) when omitted.
+ sidecars: also emit ``_ts_min``/``ts_max`` and the per-type
+ spatial (``x/y/z``) or numeric (``v``) bound columns.
+
+ Returns:
+ ``pyarrow.Table``.
+ """
+ pa = _pa()
+ cols = _as_columns(data)
+ temporal_cols = _detect_temporal_columns(cols, temporal_columns)
+
+ arrays: "dict[str, Any]" = {}
+ footer_cols: "dict[str, str]" = {}
+
+ for name, values in cols.items():
+ if name not in temporal_cols:
+ arrays[name] = pa.array(values)
+ continue
+
+ base_type = None
+ wkb_col: "list[Optional[bytes]]" = []
+ per_row: "list[Optional[dict]]" = [] # row-aligned sidecar dicts
+ for v in values:
+ if v is None:
+ wkb_col.append(None)
+ per_row.append(None)
+ continue
+ if base_type is None:
+ base_type = _base_type_name(v)
+ wkb_col.append(v.as_wkb())
+ per_row.append(_sidecar_values(v) if sidecars else None)
+ if base_type is None:
+ raise ValueError(f"temporal column {name!r} is entirely null")
+ footer_cols[name] = base_type
+ arrays[name] = pa.array(wkb_col, type=pa.binary())
+ if sidecars:
+ suffixes: "list[str]" = []
+ for d in per_row:
+ for k in d or ():
+ if k not in suffixes:
+ suffixes.append(k)
+ for suffix in suffixes:
+ col = [None if d is None else d.get(suffix) for d in per_row]
+ arrays[f"{name}_{suffix}"] = pa.array(
+ col, type=_sidecar_arrow_type(suffix)
+ )
+
+ table = pa.table(arrays)
+ md = dict(table.schema.metadata or {})
+ md[FOOTER_KEY.encode()] = temporal_footer(footer_cols).encode("utf-8")
+ return table.replace_schema_metadata(md)
+
+
+def from_arrow(table, *, reconstruct: bool = True):
+ """
+ Reverse of :func:`to_arrow`. Reads the ``temporal`` footer and, when
+ ``reconstruct`` is set, rebuilds PyMEOS objects from the WKB columns
+ (the type is recovered from the WKB itself, the footer is advisory).
+
+ Returns a ``dict`` of ``{column: list}`` (sidecar columns preserved).
+ """
+ _pa()
+ from ..temporal import Temporal
+
+ footer = _parse_footer((table.schema.metadata or {}).get(FOOTER_KEY.encode()))
+ out: "dict[str, list]" = {}
+ for name in table.column_names:
+ values = table.column(name).to_pylist()
+ if reconstruct and name in footer:
+ out[name] = [None if v is None else Temporal.from_wkb(v) for v in values]
+ else:
+ out[name] = values
+ return out
+
+
+# --------------------------------------------------------------------------- #
+# Parquet file API
+# --------------------------------------------------------------------------- #
+def write_temporal(
+ data: Any,
+ path: str,
+ *,
+ temporal_columns: Optional[Sequence[str]] = None,
+ sidecars: bool = True,
+ row_group_size: Optional[int] = None,
+ **write_kwargs: Any,
+) -> None:
+ """
+ Write ``data`` to a Parquet file using the data-lake recipe. The
+ ``temporal`` footer lands in the Parquet ``KV_METADATA`` and the native
+ sidecar columns carry the row-group statistics used for pushdown.
+ """
+ pq = _pq()
+ table = to_arrow(data, temporal_columns=temporal_columns, sidecars=sidecars)
+ pq.write_table(table, path, row_group_size=row_group_size, **write_kwargs)
+
+
+def read_temporal(
+ path: str,
+ *,
+ columns: Optional[Sequence[str]] = None,
+ filters: Optional[Any] = None,
+ reconstruct: bool = True,
+):
+ """
+ Read a Parquet file written by :func:`write_temporal` (or MobilityDuck).
+
+ ``filters`` is passed straight to ``pyarrow.parquet``; predicates on the
+ native sidecar columns (e.g. ``[("trip_ts_min", "<=", end)]``) prune row
+ groups before any WKB is decoded.
+ """
+ pq = _pq()
+ table = pq.read_table(
+ path, columns=list(columns) if columns else None, filters=filters
+ )
+ return from_arrow(table, reconstruct=reconstruct)
diff --git a/pyproject.toml b/pyproject.toml
index d0b8aaa2..9f0fcd14 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -59,6 +59,10 @@ pandas = [
'geopandas'
]
+parquet = [
+ 'pyarrow'
+]
+
[project.urls]
'Homepage' = 'https://github.com/MobilityDB/PyMEOS/pymeos'
'Bug Tracker' = 'https://github.com/MobilityDB/PyMEOS/issues'
diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py
new file mode 100644
index 00000000..fa5f8583
--- /dev/null
+++ b/tests/io/test_parquet.py
@@ -0,0 +1,97 @@
+from datetime import datetime, timedelta, timezone
+
+import pytest
+
+pytest.importorskip("pyarrow") # the optional `parquet` extra
+
+import pyarrow as pa # noqa: E402
+import pyarrow.parquet as pq # noqa: E402
+
+from pymeos import TGeogPointSeq, TFloatSeq # noqa: E402
+from pymeos.io.parquet import ( # noqa: E402
+ temporal_footer,
+ to_arrow,
+ from_arrow,
+ write_temporal,
+ read_temporal,
+)
+
+# Byte-for-byte output of MobilityDuck's temporalFooter() (interop contract).
+_MOBILITYDUCK_FOOTER = (
+ '{"version":"1.0.0","columns":{"traj":{"encoding":"MEOS-WKB",'
+ '"encoding_version":"1.0","base_type":"tgeogpoint"}}}'
+)
+
+
+def _base():
+ return datetime(2026, 1, 15, tzinfo=timezone.utc)
+
+
+def _traj(lon, lat, dlon, dlat):
+ pts = ", ".join(
+ f"Point({lon + dlon * s:.6f} {lat + dlat * s:.6f})@"
+ f"{(_base() + timedelta(minutes=10 * s)).isoformat()}"
+ for s in range(12)
+ )
+ return TGeogPointSeq(f"[{pts}]")
+
+
+def test_footer_is_byte_identical_to_mobilityduck():
+ assert temporal_footer({"traj": "tgeogpoint"}) == _MOBILITYDUCK_FOOTER
+
+
+def test_to_arrow_emits_wkb_payload_sidecars_and_footer():
+ rows = {
+ "entity_id": [1, 2],
+ "traj": [_traj(10, 55, 0.2, 0.05), _traj(14, 56, -0.1, -0.08)],
+ }
+ tbl = to_arrow(rows)
+
+ assert b"temporal" in (tbl.schema.metadata or {})
+ assert pa.types.is_binary(tbl.schema.field("traj").type)
+ for c in (
+ "traj_ts_min",
+ "traj_ts_max",
+ "traj_x_min",
+ "traj_x_max",
+ "traj_y_min",
+ "traj_y_max",
+ ):
+ assert c in tbl.column_names
+ assert pa.types.is_timestamp(tbl.schema.field("traj_ts_min").type)
+ assert pa.types.is_floating(tbl.schema.field("traj_x_min").type)
+
+
+def test_parquet_roundtrip_footer_and_sidecar_pushdown(tmp_path):
+ rows = {
+ "entity_id": [1, 2, 3],
+ "traj": [
+ _traj(10, 55, 0.2, 0.05),
+ _traj(8.5, 57.5, 0.06, -0.06),
+ _traj(9.5, 54.5, 0.22, 0.06),
+ ],
+ }
+ path = str(tmp_path / "demo.parquet")
+ write_temporal(rows, path, row_group_size=1)
+
+ assert pq.read_metadata(path).metadata[b"temporal"] == _MOBILITYDUCK_FOOTER.encode()
+
+ # every trajectory starts at _base(); a >= cutoff filter prunes everything
+ cutoff = _base() + timedelta(minutes=30)
+ pruned = read_temporal(path, filters=[("traj_ts_min", ">=", cutoff)])
+ assert len(pruned["entity_id"]) == 0
+
+ full = read_temporal(path)
+ assert len(full["entity_id"]) == 3
+ assert type(full["traj"][0]).__name__.startswith("TGeogPoint")
+ assert full["traj"][0] == rows["traj"][0] # WKB round-trip is lossless
+
+
+def test_tfloat_numeric_sidecars_roundtrip():
+ data = {"id": [1], "v": [TFloatSeq("[1@2026-01-01, 9@2026-01-02]")]}
+ tbl = to_arrow(data)
+ assert {"v_v_min", "v_v_max", "v_ts_min", "v_ts_max"}.issubset(
+ set(tbl.column_names)
+ )
+ back = from_arrow(tbl)
+ assert back["v"][0] == data["v"][0]