From 2cc4aa1d12b7c505ac9928d3aeef1c3d747c31b3 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 17 Jun 2026 14:29:11 +0530 Subject: [PATCH 1/5] feat(feather): add positional Arrow lookup provider FeatherLookupProvider + FeatherSidecarBuilder behind a feather-provider feature: a sparse-rowid positional sidecar for DuckLake vector search, served through the existing PointLookupProvider trait. Hydration is binary-search (partition_point) over the sorted rowid column plus an exact-match guard, then take(positions) on verbatim Arrow columns - no SQLite/serde_json dep and no Arrow<->SQL type mapping. The builder sorts at finish, so correctness does not depend on the DuckLake scan order. Tests cross-check row/key/order/projection parity against the SQLite provider on the supported types, and verify lossless round-trip of types SQLite rejects (Decimal/FixedSizeList/Struct/Dictionary/nested List). --- Cargo.lock | 1 + Cargo.toml | 6 + src/feather_provider.rs | 448 ++++++++++++++++++++++++ src/lib.rs | 4 + tests/feather_provider_test.rs | 610 +++++++++++++++++++++++++++++++++ 5 files changed, 1069 insertions(+) create mode 100644 src/feather_provider.rs create mode 100644 tests/feather_provider_test.rs diff --git a/Cargo.lock b/Cargo.lock index c850dbb..67710bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1405,6 +1405,7 @@ name = "datafusion-vector-search-ext" version = "0.1.0" dependencies = [ "arrow-array", + "arrow-ipc", "arrow-schema", "async-trait", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 46962d8..c7ad244 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ license = "MIT OR Apache-2.0" [features] parquet-provider = ["dep:parquet", "dep:object_store", "dep:bytes"] sqlite-provider = ["dep:rusqlite", "dep:serde_json", "dep:parquet"] +feather-provider = ["dep:arrow-ipc"] [dependencies] tracing = "0.1" @@ -29,6 +30,11 @@ bytes = { version = "1", optional = true } rusqlite = { version = "0.32", optional = true, features = ["bundled"] } serde_json = { version = "1", optional = true } +# feather-provider — Arrow IPC (Feather v2) read/write. `take`, `sort_to_indices`, +# and `concat_batches` come from `datafusion::arrow::compute`, so the only extra +# dep is the IPC reader/writer. +arrow-ipc = { version = "58", optional = true } + [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tempfile = "3" diff --git a/src/feather_provider.rs b/src/feather_provider.rs new file mode 100644 index 0000000..909155b --- /dev/null +++ b/src/feather_provider.rs @@ -0,0 +1,448 @@ +// feather_provider.rs — Arrow/Feather (Arrow IPC) positional PointLookupProvider. +// +// A drop-in alternative to `SqliteLookupProvider` for the DuckLake vector-search +// payload sidecar. Where SQLite stores rows in a B-tree keyed by `rowid` and +// hydrates via `WHERE rowid IN (...)`, this provider stores the payload as a +// single Arrow IPC file sorted ascending by the (sparse / holey) `rowid`, and +// hydrates by: +// +// 1. binary-searching the sorted `rowid` column for each requested key +// (`slice::partition_point`), +// 2. guarding with an exact-match check (`rowid[pos] == key`) so a missing +// rowid never aliases its neighbour, then +// 3. `take`-ing the matched physical positions out of the payload columns. +// +// The sorted `rowid` column *is* the index — no separate structure. Because the +// payload is stored verbatim Arrow, hydration is `select(proj).take(positions)` +// with zero type conversion: no `arrow_cell_to_sql` on build, no +// `sql_values_to_arrow` on read, and no rejection/coercion of Decimal / Struct / +// Map / FixedSizeList / Dictionary / Timestamp(tz) / nested List. +// +// Scope: this is the PoC primitive (ticket #702). The whole file is read into +// memory once at open and held resident (the "expand to uncompressed on NVMe +// before querying" model). mmap demand-paging and a coarse per-batch index for +// multi-GB sidecars are documented follow-ups, not implemented here. + +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use arrow_array::{Array, ArrayRef, RecordBatch, UInt32Array, UInt64Array}; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::arrow::compute; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::Result as DFResult; +use datafusion::datasource::MemTable; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::{Expr, TableType}; +use datafusion::physical_plan::ExecutionPlan; + +use crate::lookup::{PointLookupProvider, extract_keys_as_u64}; + +// ── Provider ────────────────────────────────────────────────────────────────── + +/// In-memory, sorted-by-key Arrow positional [`PointLookupProvider`]. +/// +/// Holds the full payload as one concatenated [`RecordBatch`] (sorted ascending +/// by the key column, which is field 0 of the schema) plus a contiguous +/// `Vec` of the key values for binary search. Built by +/// [`FeatherSidecarBuilder`] or opened from an existing `.feather` file with +/// [`open`](Self::open). +pub struct FeatherLookupProvider { + schema: SchemaRef, + /// Payload rows, sorted ascending by `keys[i]`. Row `i` of every column + /// corresponds to `keys[i]`. + batch: RecordBatch, + /// Sorted ascending key values, one per row of `batch`. The contiguous + /// `rowid` index: `partition_point` over this maps a key → physical row. + keys: Arc>, + key_col: String, +} + +impl fmt::Debug for FeatherLookupProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "FeatherLookupProvider(key_col={}, rows={}, schema_cols={})", + self.key_col, + self.keys.len(), + self.schema.fields().len() + ) + } +} + +impl FeatherLookupProvider { + /// Open an existing `.feather` (Arrow IPC file) sidecar. + /// + /// Reads every batch, concatenates them into one resident batch, and lifts + /// the key column (field 0) into a contiguous `Vec`. The on-disk schema + /// is self-describing, so the provider's [`schema`](PointLookupProvider::schema) + /// is taken from the file verbatim. Fails if the stored key column is not + /// sorted ascending — `fetch_by_keys`' binary search depends on it, and the + /// builder always writes sorted, so an unsorted file signals corruption. + pub fn open(path: &str) -> DFResult { + let file = std::fs::File::open(path) + .map_err(|e| DataFusionError::Execution(format!("open feather sidecar {path}: {e}")))?; + let reader = arrow_ipc::reader::FileReader::try_new(file, None) + .map_err(|e| DataFusionError::Execution(format!("read feather sidecar {path}: {e}")))?; + let schema = reader.schema(); + let batches = reader.collect::, _>>().map_err(|e| { + DataFusionError::Execution(format!("decode feather sidecar {path}: {e}")) + })?; + Self::from_batches(schema, batches) + } + + /// Build a provider from already-decoded, sorted-by-key batches. Shared by + /// [`open`](Self::open) and [`FeatherSidecarBuilder::finish`]. + fn from_batches(schema: SchemaRef, batches: Vec) -> DFResult { + if schema.fields().is_empty() { + return Err(DataFusionError::Execution( + "FeatherLookupProvider: schema has no columns; field 0 must be the key column" + .into(), + )); + } + let key_col = schema.field(0).name().clone(); + + // One contiguous batch so `take` addresses a global physical position. + let batch = if batches.is_empty() { + RecordBatch::new_empty(schema.clone()) + } else { + compute::concat_batches(&schema, &batches) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))? + }; + + let keys: Vec = extract_keys_as_u64(batch.column(0).as_ref())? + .into_iter() + .map(|k| { + k.ok_or_else(|| { + DataFusionError::Execution( + "FeatherLookupProvider: key column has a null value; \ + row keys must be non-null" + .into(), + ) + }) + }) + .collect::>()?; + + // Binary search requires ascending keys. The builder guarantees this; + // verify defensively so a bad file fails loudly at open, not silently + // at query time with wrong rows. + if keys.windows(2).any(|w| w[0] > w[1]) { + return Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column '{key_col}' is not sorted ascending; \ + the sidecar is corrupt or was not built by FeatherSidecarBuilder" + ))); + } + + Ok(Self { + schema, + batch, + keys: Arc::new(keys), + key_col, + }) + } + + pub fn len(&self) -> usize { + self.keys.len() + } + pub fn is_empty(&self) -> bool { + self.keys.is_empty() + } +} + +#[async_trait] +impl PointLookupProvider for FeatherLookupProvider { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + async fn fetch_by_keys( + &self, + keys: &[u64], + _key_col: &str, + projection: Option<&[usize]>, + ) -> DFResult> { + if keys.is_empty() { + return Ok(vec![]); + } + + // Sort + dedup the requested keys. Sorting makes the resulting physical + // positions ascending, so the output is ordered by key — matching + // SQLite's `ORDER BY rowid`. Dedup matches `WHERE key IN (...)`, which + // returns one row per distinct key regardless of repeats. + let mut want = keys.to_vec(); + want.sort_unstable(); + want.dedup(); + + // Map each requested key to a physical position via binary search, with + // an exact-match guard so an absent key is skipped rather than aliased + // onto its lower-bound neighbour. + let mut positions: Vec = Vec::with_capacity(want.len()); + for &k in &want { + let pos = self.keys.partition_point(|&v| v < k); + if pos < self.keys.len() && self.keys[pos] == k { + positions.push(pos as u64); + } + } + if positions.is_empty() { + return Ok(vec![]); + } + + // Columns to read, in output order. Projection indexes into the provider + // schema (0 = key column), mirroring the SQLite provider's contract. + let col_indices: Vec = match projection { + None => (0..self.schema.fields().len()).collect(), + Some(idxs) => idxs.to_vec(), + }; + let out_schema: SchemaRef = match projection { + None => self.schema.clone(), + Some(idxs) => Arc::new(arrow_schema::Schema::new( + idxs.iter() + .map(|&i| self.schema.field(i).clone()) + .collect::>(), + )), + }; + + let pos_arr = UInt64Array::from(positions); + let cols: Vec = col_indices + .iter() + .map(|&i| { + compute::take(self.batch.column(i).as_ref(), &pos_arr, None) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + }) + .collect::>()?; + + let batch = RecordBatch::try_new(out_schema, cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + Ok(vec![batch]) + } +} + +// ── TableProvider ───────────────────────────────────────────────────────────── +// +// Mirrors `SqliteLookupProvider` / `HashKeyProvider`: the payload is already +// resident, so a full scan is a cheap MemTable over the single batch. This lets +// DataFusion resolve column names when the provider is registered as a table. + +#[async_trait] +impl TableProvider for FeatherLookupProvider { + fn as_any(&self) -> &dyn Any { + self + } + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let mem = MemTable::try_new(self.schema.clone(), vec![vec![self.batch.clone()]])?; + mem.scan(state, projection, &[], None).await + } +} + +// ── Streaming sidecar builder ─────────────────────────────────────────────── + +/// Incremental builder for a [`FeatherLookupProvider`], the Feather analogue of +/// [`SqliteSidecarBuilder`](crate::sqlite_provider::SqliteSidecarBuilder). +/// +/// Takes input [`RecordBatch`]es one at a time (e.g. from the DuckLake +/// snapshot-pinned, row-lineage scan), reading each row's key from a designated +/// column and projecting the value columns into the output schema. On +/// [`finish`](Self::finish) the buffered rows are sorted ascending by key and +/// written as one Arrow IPC batch. +/// +/// **Sort-agnostic by design.** Unlike the SQLite B-tree (which is order +/// independent), Feather binary search requires the on-disk key column sorted. +/// Rather than depend on the DuckLake scan emitting sorted rowids — the ticket's +/// top open risk — this builder sorts at `finish`, so correctness holds for any +/// input order. Whether the input was *already* sorted is tracked and reported +/// via [`input_was_sorted`](Self::input_was_sorted), feeding the decision on +/// whether a bounded-memory merge (instead of the in-memory sort) is needed at +/// production scale. +/// +/// **Memory:** the PoC buffers all projected rows before sorting — O(N) resident. +/// A bounded-memory external merge-of-sorted-runs is the documented scale plan; +/// it is not implemented here. +/// +/// The first field of `schema` is the key column; fields 1.. are the stored +/// value columns. `key_col_index` / `value_col_indices` index into the *input* +/// batches passed to [`push_batch`](Self::push_batch) (matching +/// [`SqliteSidecarBuilder::begin`](crate::sqlite_provider::SqliteSidecarBuilder::begin)). +pub struct FeatherSidecarBuilder { + path: String, + schema: SchemaRef, + key_col_index: usize, + value_col_indices: Vec, + /// Projected batches conforming to `schema`, accumulated across push_batch. + buffered: Vec, + /// Largest key seen so far, to detect whether the input stream is already + /// globally sorted ascending. + last_key: Option, + input_sorted: bool, +} + +impl FeatherSidecarBuilder { + /// Begin a build targeting `path` (the output `.feather` file). + /// + /// `schema` is the output schema — field 0 is the key column, fields 1.. are + /// the stored value columns, verbatim Arrow (no type validation: storing + /// types SQLite rejects is the point). `key_col_index` and + /// `value_col_indices` index into the input batches. + pub fn begin( + path: &str, + schema: SchemaRef, + key_col_index: usize, + value_col_indices: Vec, + ) -> DFResult { + if schema.fields().is_empty() { + return Err(DataFusionError::Execution( + "FeatherSidecarBuilder: schema has no columns; field 0 must be the key column" + .into(), + )); + } + if schema.fields().len() != value_col_indices.len() + 1 { + return Err(DataFusionError::Execution(format!( + "FeatherSidecarBuilder: schema has {} fields but expected 1 key column + {} value columns", + schema.fields().len(), + value_col_indices.len() + ))); + } + Ok(Self { + path: path.to_string(), + schema, + key_col_index, + value_col_indices, + buffered: Vec::new(), + last_key: None, + input_sorted: true, + }) + } + + /// Project and buffer every row of `batch`. The key column is read from + /// `key_col_index`; value columns from `value_col_indices`, in order, into + /// schema fields 1.. . Type mismatches between the input columns and the + /// declared schema surface here (via `RecordBatch::try_new`). + pub fn push_batch(&mut self, batch: &RecordBatch) -> DFResult<()> { + let ncols = batch.num_columns(); + if self.key_col_index >= ncols { + return Err(DataFusionError::Execution(format!( + "FeatherSidecarBuilder: key_col_index {} out of range for batch with {ncols} columns", + self.key_col_index + ))); + } + if let Some(&bad) = self.value_col_indices.iter().find(|&&i| i >= ncols) { + return Err(DataFusionError::Execution(format!( + "FeatherSidecarBuilder: value column index {bad} out of range for batch with {ncols} columns" + ))); + } + + let key_col = batch.column(self.key_col_index); + if key_col.null_count() > 0 { + return Err(DataFusionError::Execution( + "FeatherSidecarBuilder: key column has a null value; row keys must be non-null" + .into(), + )); + } + + // Track global sortedness while we have the keys in hand. + let keys = extract_keys_as_u64(key_col.as_ref())?; + for k in keys.into_iter().flatten() { + if self.last_key.is_some_and(|prev| k < prev) { + self.input_sorted = false; + } + self.last_key = Some(k); + } + + // Project input columns into output-schema order: key first, then values. + let mut cols: Vec = Vec::with_capacity(self.value_col_indices.len() + 1); + cols.push(batch.column(self.key_col_index).clone()); + for &ci in &self.value_col_indices { + cols.push(batch.column(ci).clone()); + } + let projected = RecordBatch::try_new(self.schema.clone(), cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + self.buffered.push(projected); + Ok(()) + } + + /// Whether every row pushed so far arrived in ascending key order (i.e. the + /// in-memory sort at `finish` was a no-op). Informational: feeds the + /// production decision on whether the DuckLake scan can be relied on to emit + /// sorted rowids (skipping the sort) or needs a merge step. + pub fn input_was_sorted(&self) -> bool { + self.input_sorted + } + + /// Sort buffered rows ascending by key, write them as a single Arrow IPC + /// batch to `path`, and open a [`FeatherLookupProvider`] over the result. + pub fn finish(self) -> DFResult { + let combined = if self.buffered.is_empty() { + RecordBatch::new_empty(self.schema.clone()) + } else { + compute::concat_batches(&self.schema, &self.buffered) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))? + }; + + // Sort ascending by the key column (field 0). Always performed so the + // file is sorted regardless of input order; cheap when already sorted. + let sorted = if combined.num_rows() == 0 { + combined + } else { + let indices = compute::sort_to_indices(combined.column(0).as_ref(), None, None) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + sort_batch(&combined, &indices)? + }; + + write_ipc_file(&self.path, &sorted)?; + + tracing::info!( + "Feather sidecar '{}' built: {} rows, input_already_sorted={}.", + self.path, + sorted.num_rows(), + self.input_sorted, + ); + + FeatherLookupProvider::from_batches(self.schema, vec![sorted]) + } +} + +/// `take` every column of `batch` by `indices`, preserving the schema. +fn sort_batch(batch: &RecordBatch, indices: &UInt32Array) -> DFResult { + let cols: Vec = batch + .columns() + .iter() + .map(|c| { + compute::take(c.as_ref(), indices, None) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + }) + .collect::>()?; + RecordBatch::try_new(batch.schema(), cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) +} + +/// Write a single batch as an Arrow IPC *file* (Feather v2). Uncompressed: the +/// fast path needs on-disk bytes == in-memory layout (zero-copy take), and IPC +/// whole-buffer compression would force a full-column decode per scattered row. +fn write_ipc_file(path: &str, batch: &RecordBatch) -> DFResult<()> { + let file = std::fs::File::create(path) + .map_err(|e| DataFusionError::Execution(format!("create feather sidecar {path}: {e}")))?; + let mut writer = arrow_ipc::writer::FileWriter::try_new(file, &batch.schema()) + .map_err(|e| DataFusionError::Execution(format!("init feather writer {path}: {e}")))?; + if batch.num_rows() > 0 { + writer + .write(batch) + .map_err(|e| DataFusionError::Execution(format!("write feather batch {path}: {e}")))?; + } + writer + .finish() + .map_err(|e| DataFusionError::Execution(format!("finalize feather sidecar {path}: {e}")))?; + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index a5f9eab..1a98efb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,6 +65,8 @@ pub mod rule; pub mod udf; pub mod udtf; +#[cfg(feature = "feather-provider")] +pub mod feather_provider; #[cfg(feature = "parquet-provider")] pub mod parquet_provider; #[cfg(feature = "sqlite-provider")] @@ -82,6 +84,8 @@ pub use rule::USearchRule; pub use udf::{cosine_distance_udf, l2_distance_udf, negative_dot_product_udf}; pub use udtf::VectorSearchVectorUDTF; +#[cfg(feature = "feather-provider")] +pub use feather_provider::{FeatherLookupProvider, FeatherSidecarBuilder}; #[cfg(feature = "parquet-provider")] pub use parquet_provider::ParquetLookupProvider; #[cfg(feature = "sqlite-provider")] diff --git a/tests/feather_provider_test.rs b/tests/feather_provider_test.rs new file mode 100644 index 0000000..19fbf09 --- /dev/null +++ b/tests/feather_provider_test.rs @@ -0,0 +1,610 @@ +#![cfg(feature = "feather-provider")] +//! Tests for `FeatherLookupProvider` / `FeatherSidecarBuilder` (ticket #702). +//! +//! Coverage: +//! - **Parity** (`#[cfg(feature = "sqlite-provider")]`): a Rust port of the +//! benchmark's `verify_same_data.py` — Feather returns the same rows as the +//! SQLite provider for sparse/holey rowids, including edges and absent keys. +//! - **Type fidelity**: Feather round-trips Decimal / FixedSizeList / Struct / +//! Dictionary / nested-List losslessly — the types `SqliteSidecarBuilder` +//! rejects at build time. +//! - **Projection**, **build-order / sort-agnosticism**, and an informational +//! **footprint + build-time** comparison. + +use std::sync::Arc; + +use arrow_array::builder::{FixedSizeListBuilder, Float64Builder, ListBuilder}; +use arrow_array::types::Int32Type; +use arrow_array::{ + Array, ArrayRef, Decimal128Array, DictionaryArray, Int64Array, RecordBatch, StringArray, + StructArray, TimestampMicrosecondArray, +}; +use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; +use datafusion::arrow::util::pretty::pretty_format_batches; +use datafusion_vector_search_ext::{ + FeatherLookupProvider, FeatherSidecarBuilder, PointLookupProvider, +}; +use tempfile::tempdir; + +// ── Synthetic sparse-rowid payload (DuckLake-shaped) ──────────────────────────── + +/// Deterministic PRNG (SplitMix64) so tests are reproducible without a `rand` dep. +struct Rng(u64); +impl Rng { + fn next(&mut self) -> u64 { + self.0 = self.0.wrapping_add(0x9E3779B97F4A7C15); + let mut z = self.0; + z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB); + z ^ (z >> 31) + } + fn below(&mut self, n: u64) -> u64 { + self.next() % n + } +} + +/// Columns mirror the benchmark payload: rowid (key) + id/url/title/sha/raw/filename. +/// `title` is intermittently null to exercise null handling. +fn payload_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("id", DataType::Int64, false), + Field::new("url", DataType::Utf8, true), + Field::new("title", DataType::Utf8, true), + Field::new("sha", DataType::Utf8, true), + Field::new("raw", DataType::Utf8, true), + Field::new("filename", DataType::Utf8, true), + ])) +} + +/// Generate `n` rows with strictly-increasing **sparse** rowids (~60% holes: +/// gaps drawn from 1..=4), chunked into `chunk`-row batches. Returns the schema, +/// the batches (in ascending-key order), and the sorted rowid list. +fn gen_payload(n: usize, chunk: usize, seed: u64) -> (SchemaRef, Vec, Vec) { + let schema = payload_schema(); + let mut rng = Rng(seed); + + let mut rowids: Vec = Vec::with_capacity(n); + let mut cur: i64 = 5; + for _ in 0..n { + cur += 1 + rng.below(4) as i64; // gap 1..=4 → sparse, strictly increasing + rowids.push(cur); + } + + let mut batches = Vec::new(); + for start in (0..n).step_by(chunk) { + let end = (start + chunk).min(n); + let rid: Vec = rowids[start..end].to_vec(); + let id: Vec = rid.iter().map(|r| r.wrapping_mul(7)).collect(); + let url: Vec> = rid + .iter() + .map(|r| Some(format!("https://example.com/doc/{r}"))) + .collect(); + let title: Vec> = rid + .iter() + .map(|r| (r % 7 != 0).then(|| format!("Title for row {r}"))) + .collect(); + let sha: Vec> = rid.iter().map(|r| Some(format!("{r:040x}"))).collect(); + let raw: Vec> = rid + .iter() + .map(|r| Some(format!("raw payload bytes for row {r}: {}", "x".repeat(40)))) + .collect(); + let filename: Vec> = + rid.iter().map(|r| Some(format!("file_{r}.txt"))).collect(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(rid)), + Arc::new(Int64Array::from(id)), + Arc::new(StringArray::from(url)), + Arc::new(StringArray::from(title)), + Arc::new(StringArray::from(sha)), + Arc::new(StringArray::from(raw)), + Arc::new(StringArray::from(filename)), + ], + ) + .unwrap(); + batches.push(batch); + } + (schema, batches, rowids) +} + +fn build_feather( + dir: &tempfile::TempDir, + schema: SchemaRef, + batches: &[RecordBatch], +) -> FeatherLookupProvider { + let path = dir.path().join("payload.feather"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut builder = + FeatherSidecarBuilder::begin(path.to_str().unwrap(), schema, 0, value_cols).unwrap(); + for b in batches { + builder.push_batch(b).unwrap(); + } + builder.finish().unwrap() +} + +fn fmt(batches: &[RecordBatch]) -> String { + pretty_format_batches(batches).unwrap().to_string() +} + +// ── A. Provider correctness & parity ─────────────────────────────────────────── + +#[cfg(feature = "sqlite-provider")] +fn build_sqlite( + dir: &tempfile::TempDir, + schema: SchemaRef, + batches: &[RecordBatch], +) -> datafusion_vector_search_ext::SqliteLookupProvider { + use datafusion_vector_search_ext::SqliteSidecarBuilder; + let path = dir.path().join("payload.db"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut builder = + SqliteSidecarBuilder::begin(path.to_str().unwrap(), "payload", 4, schema, 0, value_cols) + .unwrap(); + for b in batches { + builder.push_batch(b).unwrap(); + } + builder.finish().unwrap() +} + +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn parity_with_sqlite_sparse_rowids() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(3000, 256, 0xC0FFEE); + + let feather = build_feather(&dir, schema.clone(), &batches); + let sqlite = build_sqlite(&dir, schema.clone(), &batches); + + let n = rowids.len(); + let mut rng = Rng(42); + let pick = |rng: &mut Rng, count: usize| -> Vec { + (0..count) + .map(|_| rowids[rng.below(n as u64) as usize] as u64) + .collect() + }; + + // Build a battery of key sets covering scattered, edges, duplicates, absent. + let min = rowids[0] as u64; + let max = *rowids.last().unwrap() as u64; + let mid = rowids[n / 2] as u64; + + let mut key_sets: Vec<(&str, Vec)> = vec![ + ("empty", vec![]), + ("smallest", vec![min]), + ("largest", vec![max]), + ("mid", vec![mid]), + ("edges_together", vec![min, mid, max]), + ("duplicates", vec![mid, mid, min, min, max]), + // Absent: below min, above max, and a value in a gap between two rowids. + ("absent_below", vec![0]), + ("absent_above", vec![max + 1000]), + ("absent_in_gap", vec![rowids[10] as u64 + 1]), + ("mixed_present_absent", vec![min, 0, mid, max + 1000, max]), + ]; + for k in [1usize, 10, 100, 1000] { + key_sets.push(("scattered", pick(&mut rng, k))); + } + + for (label, keys) in &key_sets { + let f = feather.fetch_by_keys(keys, "rowid", None).await.unwrap(); + let s = sqlite.fetch_by_keys(keys, "rowid", None).await.unwrap(); + assert_eq!( + fmt(&f), + fmt(&s), + "feather vs sqlite mismatch for key set '{label}' ({} keys)", + keys.len() + ); + } +} + +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn parity_with_sqlite_projection() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(1500, 512, 7); + let feather = build_feather(&dir, schema.clone(), &batches); + let sqlite = build_sqlite(&dir, schema.clone(), &batches); + + let keys: Vec = rowids.iter().step_by(13).map(|&r| r as u64).collect(); + + // narrow (rowid, title, filename) and full + a reordered projection. + for proj in [ + vec![0usize, 3, 6], + (0..schema.fields().len()).collect::>(), + vec![3, 0, 6, 1], + ] { + let f = feather + .fetch_by_keys(&keys, "rowid", Some(&proj)) + .await + .unwrap(); + let s = sqlite + .fetch_by_keys(&keys, "rowid", Some(&proj)) + .await + .unwrap(); + assert_eq!(fmt(&f), fmt(&s), "projection {proj:?} mismatch"); + // Only the projected columns are present, in projection order. + assert_eq!(f[0].num_columns(), proj.len()); + for (out_i, &src_i) in proj.iter().enumerate() { + assert_eq!( + f[0].schema().field(out_i).name(), + schema.field(src_i).name() + ); + } + } +} + +/// The exact-match guard: an absent key (one that falls between two stored +/// rowids, or below/above the range) must never alias onto a neighbour. After +/// hydration the returned rowid column equals exactly the present, sorted, +/// deduped requested keys. +#[tokio::test] +async fn binary_search_lands_exactly() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(2000, 256, 99); + let feather = build_feather(&dir, schema, &batches); + + let present: Vec = rowids.iter().step_by(7).map(|&r| r as u64).collect(); + let absent: Vec = rowids + .iter() + .step_by(7) + .map(|&r| r as u64 + 1) // gaps are ≥1, so +1 may or may not exist; filter below + .filter(|k| rowids.binary_search(&(*k as i64)).is_err()) + .collect(); + + let mut request = present.clone(); + request.extend(&absent); + request.push(0); + request.push(*rowids.last().unwrap() as u64 + 9999); + + let out = feather + .fetch_by_keys(&request, "rowid", None) + .await + .unwrap(); + let got: Vec = out[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + + let mut expected: Vec = present.iter().map(|&k| k as i64).collect(); + expected.sort_unstable(); + expected.dedup(); + assert_eq!(got, expected, "binary search aliased an absent key"); +} + +#[tokio::test] +async fn empty_and_all_absent_return_no_batches() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(500, 128, 3); + let feather = build_feather(&dir, schema, &batches); + + assert!( + feather + .fetch_by_keys(&[], "rowid", None) + .await + .unwrap() + .is_empty() + ); + let absent = vec![0u64, 1, *rowids.last().unwrap() as u64 + 1_000_000]; + assert!( + feather + .fetch_by_keys(&absent, "rowid", None) + .await + .unwrap() + .is_empty() + ); +} + +// ── B. Type fidelity ──────────────────────────────────────────────────────────── + +/// Build a payload with types `SqliteSidecarBuilder` rejects, and confirm Feather +/// round-trips them losslessly — same Arrow types out, same values. +#[tokio::test] +async fn type_fidelity_roundtrip() { + let dir = tempdir().unwrap(); + + let struct_fields = Fields::from(vec![ + Field::new("lat", DataType::Float64, false), + Field::new("lon", DataType::Float64, false), + ]); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("price", DataType::Decimal128(20, 4), false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("America/New_York".into())), + false, + ), + Field::new( + "embedding3", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float64, true)), 3), + false, + ), + Field::new("loc", DataType::Struct(struct_fields.clone()), false), + Field::new( + "tag", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + Field::new( + "scores", + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))), + true, + ), + ])); + + let rowid = Int64Array::from(vec![10_i64, 20, 30]); + let price = Decimal128Array::from(vec![12345_i128, -98765, 0]) + .with_precision_and_scale(20, 4) + .unwrap(); + let ts = TimestampMicrosecondArray::from(vec![1_000_000_i64, 2_000_000, 3_000_000]) + .with_timezone("America/New_York"); + + let mut fsl = FixedSizeListBuilder::new(Float64Builder::new(), 3); + for triple in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] { + for v in triple { + fsl.values().append_value(v); + } + fsl.append(true); + } + let embedding3 = fsl.finish(); + + let loc = StructArray::from(vec![ + ( + Arc::new(Field::new("lat", DataType::Float64, false)), + Arc::new(arrow_array::Float64Array::from(vec![40.0, 41.0, 42.0])) as ArrayRef, + ), + ( + Arc::new(Field::new("lon", DataType::Float64, false)), + Arc::new(arrow_array::Float64Array::from(vec![-74.0, -75.0, -76.0])) as ArrayRef, + ), + ]); + + let tag: DictionaryArray = vec![Some("nlp"), Some("vision"), Some("nlp")] + .into_iter() + .collect(); + + let mut scores = ListBuilder::new(Float64Builder::new()); + scores.values().append_value(0.1); + scores.values().append_value(0.2); + scores.append(true); + scores.values().append_value(0.3); + scores.append(true); + scores.append(false); // null list + let scores = scores.finish(); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(rowid), + Arc::new(price), + Arc::new(ts), + Arc::new(embedding3), + Arc::new(loc), + Arc::new(tag), + Arc::new(scores), + ], + ) + .unwrap(); + + let feather = build_feather(&dir, schema.clone(), std::slice::from_ref(&input)); + let out = feather + .fetch_by_keys(&[10, 20, 30], "rowid", None) + .await + .unwrap(); + + // Types preserved exactly (not coerced to i64/JSON/TEXT). + for i in 0..schema.fields().len() { + assert_eq!( + out[0].schema().field(i).data_type(), + schema.field(i).data_type(), + "type of column '{}' was not preserved", + schema.field(i).name() + ); + } + // Values preserved exactly (input is already key-sorted, so rows align). + assert_eq!(fmt(&out), fmt(std::slice::from_ref(&input))); +} + +/// The contrast: `SqliteSidecarBuilder::begin` rejects each of these payload +/// types at build time (validated against its supported set), whereas Feather +/// accepts them (proven above). +#[cfg(feature = "sqlite-provider")] +#[test] +fn sqlite_rejects_types_feather_accepts() { + use datafusion_vector_search_ext::SqliteSidecarBuilder; + let dir = tempdir().unwrap(); + + let exotic: Vec<(&str, DataType)> = vec![ + ("decimal", DataType::Decimal128(20, 4)), + ( + "fixed_size_list", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float64, true)), 3), + ), + ( + "struct", + DataType::Struct(Fields::from(vec![Field::new( + "a", + DataType::Float64, + false, + )])), + ), + ( + "dictionary", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + ), + ( + "list_of_float", + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))), + ), + ]; + + for (name, dt) in exotic { + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new(name, dt, true), + ])); + let db = dir.path().join(format!("{name}.db")); + let res = + SqliteSidecarBuilder::begin(db.to_str().unwrap(), "t", 1, schema.clone(), 0, vec![1]); + assert!( + res.is_err(), + "expected SQLite to reject payload type for column '{name}'" + ); + + // Feather accepts the same schema at begin. + let feather_path = dir.path().join(format!("{name}.feather")); + assert!( + FeatherSidecarBuilder::begin(feather_path.to_str().unwrap(), schema, 0, vec![1]) + .is_ok(), + "expected Feather to accept payload type for column '{name}'" + ); + } +} + +// ── D/E. Build path: sort-agnosticism & ordering ──────────────────────────────── + +/// The builder must produce a sorted-by-rowid file regardless of input order, +/// and report whether the input was already sorted. +#[tokio::test] +async fn builder_is_sort_agnostic() { + let dir = tempdir().unwrap(); + let (schema, sorted_batches, rowids) = gen_payload(1000, 100, 5); + + // Sorted input → reported sorted; output sorted. + let path_sorted = dir.path().join("sorted.feather"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut b = FeatherSidecarBuilder::begin( + path_sorted.to_str().unwrap(), + schema.clone(), + 0, + value_cols.clone(), + ) + .unwrap(); + for batch in &sorted_batches { + b.push_batch(batch).unwrap(); + } + assert!(b.input_was_sorted(), "ascending input should report sorted"); + let p_sorted = b.finish().unwrap(); + + // Shuffled input (reversed batch order + reversed rows) → reported unsorted, + // but output is still correctly sorted and identical to the sorted build. + let path_shuf = dir.path().join("shuffled.feather"); + let mut b2 = + FeatherSidecarBuilder::begin(path_shuf.to_str().unwrap(), schema.clone(), 0, value_cols) + .unwrap(); + for batch in sorted_batches.iter().rev() { + b2.push_batch(&reverse_rows(batch)).unwrap(); + } + assert!( + !b2.input_was_sorted(), + "shuffled input should report unsorted" + ); + let p_shuf = b2.finish().unwrap(); + + let all: Vec = rowids.iter().map(|&r| r as u64).collect(); + let from_sorted = p_sorted.fetch_by_keys(&all, "rowid", None).await.unwrap(); + let from_shuf = p_shuf.fetch_by_keys(&all, "rowid", None).await.unwrap(); + assert_eq!( + fmt(&from_sorted), + fmt(&from_shuf), + "sort-agnostic build produced different output for shuffled input" + ); + + // And the keys really are ascending. + let keys: Vec = from_shuf[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + assert!( + keys.windows(2).all(|w| w[0] < w[1]), + "output not sorted ascending" + ); +} + +/// Reverse the row order of a batch (helper for the shuffle test). +fn reverse_rows(batch: &RecordBatch) -> RecordBatch { + let n = batch.num_rows(); + let idx = arrow_array::UInt32Array::from((0..n).rev().map(|i| i as u32).collect::>()); + let cols: Vec = batch + .columns() + .iter() + .map(|c| datafusion::arrow::compute::take(c.as_ref(), &idx, None).unwrap()) + .collect(); + RecordBatch::try_new(batch.schema(), cols).unwrap() +} + +#[tokio::test] +async fn reopen_from_file_roundtrips() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(800, 200, 11); + let path = dir.path().join("reopen.feather"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut b = + FeatherSidecarBuilder::begin(path.to_str().unwrap(), schema, 0, value_cols).unwrap(); + for batch in &batches { + b.push_batch(batch).unwrap(); + } + let from_builder = b.finish().unwrap(); + + // Open a fresh provider straight off the published file (the replica path). + let reopened = FeatherLookupProvider::open(path.to_str().unwrap()).unwrap(); + assert_eq!(reopened.len(), rowids.len()); + + let keys: Vec = rowids.iter().step_by(5).map(|&r| r as u64).collect(); + let a = from_builder + .fetch_by_keys(&keys, "rowid", None) + .await + .unwrap(); + let c = reopened.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + assert_eq!(fmt(&a), fmt(&c)); +} + +// ── E/F. Footprint & build-time (informational) ───────────────────────────────── + +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn footprint_and_build_time_vs_sqlite() { + use std::time::Instant; + let dir = tempdir().unwrap(); + let (schema, batches, _rowids) = gen_payload(50_000, 2048, 1234); + + let t0 = Instant::now(); + let _f = build_feather(&dir, schema.clone(), &batches); + let feather_build = t0.elapsed(); + + let t1 = Instant::now(); + let _s = build_sqlite(&dir, schema.clone(), &batches); + let sqlite_build = t1.elapsed(); + + let feather_bytes = std::fs::metadata(dir.path().join("payload.feather")) + .unwrap() + .len(); + let sqlite_bytes = std::fs::metadata(dir.path().join("payload.db")) + .unwrap() + .len(); + + println!( + "[#702 footprint] rows=50000 feather={feather_bytes} bytes ({:.1} MiB) \ + sqlite={sqlite_bytes} bytes ({:.1} MiB) ratio(feather/sqlite)={:.2}", + feather_bytes as f64 / 1048576.0, + sqlite_bytes as f64 / 1048576.0, + feather_bytes as f64 / sqlite_bytes as f64, + ); + println!( + "[#702 build-time] feather={feather_build:?} sqlite={sqlite_build:?} \ + speedup(sqlite/feather)={:.1}x", + sqlite_build.as_secs_f64() / feather_build.as_secs_f64().max(1e-9), + ); + + // Sanity only (not a perf gate): both files exist and are non-trivial. + assert!(feather_bytes > 0 && sqlite_bytes > 0); +} From 1048e15feee72344f2663aceee4e694daba12b65 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 17 Jun 2026 16:00:37 +0530 Subject: [PATCH 2/5] fix(feather): order build sort in the u64 key domain finish() sorted rows via sort_to_indices on the raw key column (native, i.e. signed, order for Int64/Int32), while the open-time verification and fetch_by_keys binary-search compare keys cast as u64. The two orderings diverge for keys whose i64 and u64 order differ (e.g. negative Int64), so a genuinely-ascending build was rejected by from_batches as 'corrupt'. Argsort the u64-cast keys instead, so the on-disk order, the open-time sortedness check, and the search all use one domain for every supported key type. Tighten the sortedness guard to strictly-ascending (reject duplicate keys, matching SQLite's INTEGER PRIMARY KEY) and factor the null-checked u64 key extraction into a shared helper. Drop stray PoC framing from the module docs. --- src/feather_provider.rs | 77 ++++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/src/feather_provider.rs b/src/feather_provider.rs index 909155b..bada771 100644 --- a/src/feather_provider.rs +++ b/src/feather_provider.rs @@ -18,10 +18,10 @@ // `sql_values_to_arrow` on read, and no rejection/coercion of Decimal / Struct / // Map / FixedSizeList / Dictionary / Timestamp(tz) / nested List. // -// Scope: this is the PoC primitive (ticket #702). The whole file is read into -// memory once at open and held resident (the "expand to uncompressed on NVMe -// before querying" model). mmap demand-paging and a coarse per-batch index for -// multi-GB sidecars are documented follow-ups, not implemented here. +// Scope: the whole file is read into memory once at open and held resident (the +// "expand to uncompressed on NVMe before querying" model). mmap demand-paging and +// a coarse per-batch index for multi-GB sidecars are follow-ups, not implemented +// here. use std::any::Any; use std::fmt; @@ -112,26 +112,19 @@ impl FeatherLookupProvider { .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))? }; - let keys: Vec = extract_keys_as_u64(batch.column(0).as_ref())? - .into_iter() - .map(|k| { - k.ok_or_else(|| { - DataFusionError::Execution( - "FeatherLookupProvider: key column has a null value; \ - row keys must be non-null" - .into(), - ) - }) - }) - .collect::>()?; + let keys: Vec = key_column_as_u64(batch.column(0).as_ref())?; - // Binary search requires ascending keys. The builder guarantees this; - // verify defensively so a bad file fails loudly at open, not silently - // at query time with wrong rows. - if keys.windows(2).any(|w| w[0] > w[1]) { + // Binary search requires strictly-ascending keys: ascending so + // `partition_point` is valid, and unique so a key maps to one physical + // row (mirroring SQLite's `INTEGER PRIMARY KEY`, which rejects duplicate + // keys at build). The builder guarantees this; verify defensively so a + // bad file fails loudly at open, not silently at query time with wrong + // rows. Compared in u64 order to match the build sort and the search. + if keys.windows(2).any(|w| w[0] >= w[1]) { return Err(DataFusionError::Execution(format!( - "FeatherLookupProvider: key column '{key_col}' is not sorted ascending; \ - the sidecar is corrupt or was not built by FeatherSidecarBuilder" + "FeatherLookupProvider: key column '{key_col}' is not strictly ascending \ + (unsorted or duplicate keys); the sidecar is corrupt or was not built by \ + FeatherSidecarBuilder" ))); } @@ -268,8 +261,8 @@ impl TableProvider for FeatherLookupProvider { /// whether a bounded-memory merge (instead of the in-memory sort) is needed at /// production scale. /// -/// **Memory:** the PoC buffers all projected rows before sorting — O(N) resident. -/// A bounded-memory external merge-of-sorted-runs is the documented scale plan; +/// **Memory:** the builder buffers all projected rows before sorting — O(N) +/// resident. A bounded-memory external merge-of-sorted-runs is the scale plan; /// it is not implemented here. /// /// The first field of `schema` is the key column; fields 1.. are the stored @@ -391,13 +384,22 @@ impl FeatherSidecarBuilder { .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))? }; - // Sort ascending by the key column (field 0). Always performed so the - // file is sorted regardless of input order; cheap when already sorted. + // Sort ascending by the key column (field 0), in the SAME u64 domain the + // lookup uses — `extract_keys_as_u64` casts the key `as u64`, and + // `fetch_by_keys`/`from_batches` binary-search and verify in u64 order. A + // raw `sort_to_indices` would instead order by the column's native (for + // Int64/Int32: signed) order, which disagrees with the u64 search for + // keys whose i64 and u64 orderings differ (e.g. negative Int64 values), + // making `from_batches` reject a genuinely-ascending build as "corrupt". + // Argsorting the u64-cast keys keeps the on-disk order, the open()-time + // verification, and the search consistent for every supported key type. let sorted = if combined.num_rows() == 0 { combined } else { - let indices = compute::sort_to_indices(combined.column(0).as_ref(), None, None) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + let key_vals = key_column_as_u64(combined.column(0).as_ref())?; + let mut order: Vec = (0..key_vals.len() as u32).collect(); + order.sort_by_key(|&i| key_vals[i as usize]); + let indices = UInt32Array::from(order); sort_batch(&combined, &indices)? }; @@ -414,6 +416,25 @@ impl FeatherSidecarBuilder { } } +/// Lift the key column into a contiguous `Vec`, erroring on a null key (row +/// keys must be non-null). The `as u64` cast inside `extract_keys_as_u64` defines +/// the single key-ordering domain this provider uses everywhere — the build sort, +/// the open-time sortedness check, and the `fetch_by_keys` binary search. +fn key_column_as_u64(col: &dyn Array) -> DFResult> { + extract_keys_as_u64(col)? + .into_iter() + .map(|k| { + k.ok_or_else(|| { + DataFusionError::Execution( + "FeatherLookupProvider: key column has a null value; \ + row keys must be non-null" + .into(), + ) + }) + }) + .collect() +} + /// `take` every column of `batch` by `indices`, preserving the schema. fn sort_batch(batch: &RecordBatch, indices: &UInt32Array) -> DFResult { let cols: Vec = batch From 7a1dd8a3833ee8493a71229de03f967f1f8a2bd4 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 17 Jun 2026 16:00:37 +0530 Subject: [PATCH 3/5] test(feather): cover open errors, scan, and more key types Add tests for the open() rejection paths (unsorted / duplicate keys, malformed and missing files), the TableProvider::scan round-trip, UInt64 key columns, and a regression test that negative-but-ascending Int64 keys build and round-trip through the u64 lookup domain. --- tests/feather_provider_test.rs | 199 ++++++++++++++++++++++++++++++++- 1 file changed, 198 insertions(+), 1 deletion(-) diff --git a/tests/feather_provider_test.rs b/tests/feather_provider_test.rs index 19fbf09..188d364 100644 --- a/tests/feather_provider_test.rs +++ b/tests/feather_provider_test.rs @@ -17,7 +17,7 @@ use arrow_array::builder::{FixedSizeListBuilder, Float64Builder, ListBuilder}; use arrow_array::types::Int32Type; use arrow_array::{ Array, ArrayRef, Decimal128Array, DictionaryArray, Int64Array, RecordBatch, StringArray, - StructArray, TimestampMicrosecondArray, + StructArray, TimestampMicrosecondArray, UInt64Array, }; use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use datafusion::arrow::util::pretty::pretty_format_batches; @@ -608,3 +608,200 @@ async fn footprint_and_build_time_vs_sqlite() { // Sanity only (not a perf gate): both files exist and are non-trivial. assert!(feather_bytes > 0 && sqlite_bytes > 0); } + +// ── open() error paths & robustness ───────────────────────────────────────────── + +/// Write a batch to a `.feather` file directly (bypassing the builder's sort), so +/// we can construct deliberately-malformed sidecars for the rejection tests. +fn write_raw_feather(path: &str, batch: &RecordBatch) { + let file = std::fs::File::create(path).unwrap(); + let mut w = arrow_ipc::writer::FileWriter::try_new(file, &batch.schema()).unwrap(); + w.write(batch).unwrap(); + w.finish().unwrap(); +} + +/// `open` must fail loudly (not silently return wrong rows) when the stored key +/// column is not strictly ascending — both the unsorted and duplicate-key cases. +/// This guards the "fail at open, not at query time" contract the binary search +/// depends on. +#[test] +fn open_rejects_unsorted_or_duplicate_keys() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + + let unsorted = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![10_i64, 3, 1])), + Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])), + ], + ) + .unwrap(); + let p1 = dir.path().join("unsorted.feather"); + write_raw_feather(p1.to_str().unwrap(), &unsorted); + assert!( + FeatherLookupProvider::open(p1.to_str().unwrap()).is_err(), + "descending key column must be rejected" + ); + + // Duplicate keys must also be rejected (mirrors SQLite's INTEGER PRIMARY KEY). + let dup = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(vec![1_i64, 1, 2])), + Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])), + ], + ) + .unwrap(); + let p2 = dir.path().join("dup.feather"); + write_raw_feather(p2.to_str().unwrap(), &dup); + assert!( + FeatherLookupProvider::open(p2.to_str().unwrap()).is_err(), + "duplicate keys must be rejected" + ); +} + +/// `open` returns an error (never panics) on a non-Arrow file and on a missing +/// file, rather than surfacing a corrupt provider. +#[test] +fn open_rejects_malformed_or_missing_file() { + let dir = tempdir().unwrap(); + let garbage = dir.path().join("garbage.feather"); + std::fs::write(&garbage, b"this is not an arrow ipc file").unwrap(); + assert!(FeatherLookupProvider::open(garbage.to_str().unwrap()).is_err()); + + let missing = dir.path().join("does_not_exist.feather"); + assert!(FeatherLookupProvider::open(missing.to_str().unwrap()).is_err()); +} + +/// Regression for the build-sort domain fix: a genuinely-ascending Int64 key +/// column containing negative values must BUILD (not be rejected as "corrupt") +/// and round-trip via the u64 lookup domain the engine uses for rowids. Before +/// the fix, `finish()` sorted by signed Arrow order while the verify/search used +/// u64 order, so this input was spuriously rejected. +#[tokio::test] +async fn signed_negative_keys_build_and_query() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![-5_i64, -1, 3, 10])), + Arc::new(StringArray::from(vec![ + Some("neg5"), + Some("neg1"), + Some("three"), + Some("ten"), + ])), + ], + ) + .unwrap(); + let path = dir.path().join("neg.feather"); + let mut b = FeatherSidecarBuilder::begin(path.to_str().unwrap(), schema, 0, vec![1]).unwrap(); + b.push_batch(&batch).unwrap(); + let provider = b + .finish() + .expect("negative-but-ascending Int64 keys must build"); + + // The engine passes rowids as u64; a negative i64 arrives as its u64 reinterpretation. + let out = provider + .fetch_by_keys(&[(-5_i64) as u64, 3], "rowid", None) + .await + .unwrap(); + let names: Vec = out + .iter() + .flat_map(|bch| { + bch.column(1) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .map(|s| s.to_string()) + }) + .collect(); + assert_eq!(names.len(), 2); + assert!(names.contains(&"neg5".to_string()) && names.contains(&"three".to_string())); +} + +/// UInt64 key column (the other common rowid type) round-trips. Only Int64 was +/// covered before; this exercises the UInt64 arm of `extract_keys_as_u64`. +#[tokio::test] +async fn uint64_keys_roundtrip() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::UInt64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt64Array::from(vec![5_u64, 50, 500, 5000])), + Arc::new(StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + ])), + ], + ) + .unwrap(); + let path = dir.path().join("u64.feather"); + let mut b = FeatherSidecarBuilder::begin(path.to_str().unwrap(), schema, 0, vec![1]).unwrap(); + b.push_batch(&batch).unwrap(); + let provider = b.finish().unwrap(); + + let out = provider + .fetch_by_keys(&[50, 5000], "rowid", None) + .await + .unwrap(); + let names: Vec = out[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .map(|s| s.to_string()) + .collect(); + assert_eq!(names, vec!["b".to_string(), "d".to_string()]); +} + +/// The `TableProvider::scan` path (used for SQL column-name resolution) actually +/// returns rows — a working MemTable scan, unlike the parquet sibling which is +/// NotImplemented. Guards against the silent zero-row scan that bit parquet. +#[tokio::test] +async fn table_provider_scan_roundtrips() { + use datafusion::catalog::TableProvider; + use datafusion::prelude::SessionContext; + + let dir = tempdir().unwrap(); + let (schema, batches, _rowids) = gen_payload(300, 128, 21); + let provider = build_feather(&dir, schema, &batches); + let n = provider.len(); + + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(provider) as Arc) + .unwrap(); + + let all = ctx + .sql("SELECT rowid, title FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + let total: usize = all.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, n, "full scan should see every row"); + assert_eq!( + all[0].num_columns(), + 2, + "projected scan returns only 2 columns" + ); +} From 5202897e75e1266b668e2c7dbafdf98b308deb1c Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 17 Jun 2026 16:51:55 +0530 Subject: [PATCH 4/5] feat(feather): mmap read + bounded spill-sort build Make FeatherLookupProvider memory-comparable to the SQLite sidecar (which pages its B-tree) instead of fully resident: - READ: mmap the file and decode Arrow IPC zero-copy (FileDecoder over a Buffer backed by the mapping). Column buffers point into the mapped file, so a scattered take() faults in only the touched pages; only the K-row output is heap-allocated. A first-key-per-batch coarse index narrows each lookup to one (now multi-batch) file segment before the in-batch binary search. - BUILD: bounded-memory external sort. push_batch spills rows UNSORTED to a temp IPC file (O(one batch) payload resident), keeping only an O(N)*8B key index in memory; finish argsorts the keys and gathers rows in sorted order from the mmap'd temp via interleave, in bounded chunks, into the final sorted file. No full-payload buffering and no dependency on the input scan's row order (the DuckLake rowid scan order is not reliable). Also: reject out-of-range projection indices with a DataFusionError instead of panicking (matches the sibling providers; addresses the PR #28 nit), and reject duplicate keys at build/open. Adds a multi-batch shuffled-build parity test exercising the coarse index + cross-batch gather. New deps: memmap2, tempfile. --- Cargo.lock | 10 + Cargo.toml | 11 +- src/feather_provider.rs | 658 ++++++++++++++++++++------------- tests/feather_provider_test.rs | 56 +++ 4 files changed, 480 insertions(+), 255 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67710bf..5aec148 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1411,6 +1411,7 @@ dependencies = [ "bytes", "datafusion", "futures", + "memmap2", "object_store", "parquet", "rusqlite", @@ -2113,6 +2114,15 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "memmap2" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714098028fe011992e1c3962653c96b2d578c4b4bce9036e15ff220319b1e0e3" +dependencies = [ + "libc", +] + [[package]] name = "miniz_oxide" version = "0.8.9" diff --git a/Cargo.toml b/Cargo.toml index c7ad244..363e913 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" [features] parquet-provider = ["dep:parquet", "dep:object_store", "dep:bytes"] sqlite-provider = ["dep:rusqlite", "dep:serde_json", "dep:parquet"] -feather-provider = ["dep:arrow-ipc"] +feather-provider = ["dep:arrow-ipc", "dep:memmap2", "dep:tempfile"] [dependencies] tracing = "0.1" @@ -30,10 +30,13 @@ bytes = { version = "1", optional = true } rusqlite = { version = "0.32", optional = true, features = ["bundled"] } serde_json = { version = "1", optional = true } -# feather-provider — Arrow IPC (Feather v2) read/write. `take`, `sort_to_indices`, -# and `concat_batches` come from `datafusion::arrow::compute`, so the only extra -# dep is the IPC reader/writer. +# feather-provider — Arrow IPC (Feather v2). `take`/`concat_batches`/`interleave` +# come from `datafusion::arrow::compute`; the extra deps are the IPC reader/writer, +# `memmap2` for the zero-copy mmap read path, and `tempfile` for the bounded-memory +# (spill-sort) build path. arrow-ipc = { version = "58", optional = true } +memmap2 = { version = "0.9", optional = true } +tempfile = { version = "3", optional = true } [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread", "macros"] } diff --git a/src/feather_provider.rs b/src/feather_provider.rs index bada771..34521dd 100644 --- a/src/feather_provider.rs +++ b/src/feather_provider.rs @@ -1,35 +1,45 @@ // feather_provider.rs — Arrow/Feather (Arrow IPC) positional PointLookupProvider. // // A drop-in alternative to `SqliteLookupProvider` for the DuckLake vector-search -// payload sidecar. Where SQLite stores rows in a B-tree keyed by `rowid` and -// hydrates via `WHERE rowid IN (...)`, this provider stores the payload as a -// single Arrow IPC file sorted ascending by the (sparse / holey) `rowid`, and -// hydrates by: +// payload sidecar. The payload is stored as an Arrow IPC file sorted ascending by +// the (sparse / holey) `rowid` key. Hydration is: // -// 1. binary-searching the sorted `rowid` column for each requested key -// (`slice::partition_point`), -// 2. guarding with an exact-match check (`rowid[pos] == key`) so a missing -// rowid never aliases its neighbour, then -// 3. `take`-ing the matched physical positions out of the payload columns. +// 1. a coarse per-batch first-key index narrows to the batch that may hold a +// requested key, +// 2. `slice::partition_point` binary-searches that batch's rowid column, +// 3. an exact-match guard (`rowid[pos] == key`) rejects a missing rowid, then +// 4. `take` pulls the matched physical rows out of the payload columns. // -// The sorted `rowid` column *is* the index — no separate structure. Because the -// payload is stored verbatim Arrow, hydration is `select(proj).take(positions)` -// with zero type conversion: no `arrow_cell_to_sql` on build, no -// `sql_values_to_arrow` on read, and no rejection/coercion of Decimal / Struct / -// Map / FixedSizeList / Dictionary / Timestamp(tz) / nested List. +// Memory parity with SQLite (which pages its B-tree) comes from two pieces: // -// Scope: the whole file is read into memory once at open and held resident (the -// "expand to uncompressed on NVMe before querying" model). mmap demand-paging and -// a coarse per-batch index for multi-GB sidecars are follow-ups, not implemented -// here. +// * READ: the file is `mmap`'d and the Arrow arrays are decoded ZERO-COPY +// (`FileDecoder` over a `Buffer` backed by the mapping), so column buffers +// point into the mapped file. `take` over scattered keys faults in only the +// touched pages; only the small K-row output is heap-allocated. Resident +// payload memory is bounded by the working set, not the file size. +// +// * BUILD: a bounded-memory external (spill) sort. Rows stream UNSORTED to a +// temp IPC file (O(one batch) payload resident); only an O(N)·8 B key index +// is kept in memory. At `finish` the keys are argsorted and the rows are +// gathered in sorted order from the mmap'd temp via `interleave`, in bounded +// chunks, into the final sorted file. No full-payload buffering, and no +// assumption about the input scan's row order. +// +// Because the payload is stored verbatim Arrow, there is zero type conversion: +// Decimal / Struct / Map / FixedSizeList / Dictionary / Timestamp(tz) / nested +// List all round-trip losslessly (unlike the SQLite provider, which rejects or +// coerces them). use std::any::Any; use std::fmt; +use std::fs::File; +use std::ptr::NonNull; use std::sync::Arc; -use arrow_array::{Array, ArrayRef, RecordBatch, UInt32Array, UInt64Array}; -use arrow_schema::SchemaRef; +use arrow_array::{Array, ArrayRef, Int32Array, Int64Array, RecordBatch, UInt32Array, UInt64Array}; +use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; +use datafusion::arrow::buffer::Buffer; use datafusion::arrow::compute; use datafusion::catalog::{Session, TableProvider}; use datafusion::common::Result as DFResult; @@ -40,62 +50,171 @@ use datafusion::physical_plan::ExecutionPlan; use crate::lookup::{PointLookupProvider, extract_keys_as_u64}; +/// Rows per output batch when gathering the sorted result at build time. Bounds +/// the per-chunk `interleave` working set. +const BUILD_CHUNK_ROWS: usize = 8192; + +// ── mmap + zero-copy IPC decode ───────────────────────────────────────────────── + +/// Memory-map `path` and decode every Arrow IPC record batch ZERO-COPY: the +/// returned arrays' buffers point into the mapping (kept alive by the returned +/// [`Buffer`]'s custom allocation owner), so nothing is copied onto the heap. +fn mmap_feather(path: &str) -> DFResult<(SchemaRef, Vec, Buffer)> { + use arrow_ipc::convert::fb_to_schema; + use arrow_ipc::reader::{FileDecoder, read_footer_length}; + use arrow_ipc::root_as_footer; + + let file = File::open(path) + .map_err(|e| DataFusionError::Execution(format!("open feather sidecar {path}: {e}")))?; + // SAFETY: the file is treated as immutable for the provider's lifetime (built + // once, then read-only). The mapping is owned by the `Buffer` below. + let mmap = unsafe { memmap2::Mmap::map(&file) } + .map_err(|e| DataFusionError::Execution(format!("mmap feather sidecar {path}: {e}")))?; + let len = mmap.len(); + if len < 10 { + return Err(DataFusionError::Execution(format!( + "feather sidecar {path} is too small to be a valid Arrow IPC file ({len} bytes)" + ))); + } + let ptr = NonNull::new(mmap.as_ptr() as *mut u8).ok_or_else(|| { + DataFusionError::Execution(format!("feather sidecar {path}: null mmap pointer")) + })?; + // SAFETY: `ptr`/`len` describe the mapping, and the `Arc` owner keeps it + // alive for as long as any slice of this Buffer (or array derived from it) lives. + let buffer = unsafe { Buffer::from_custom_allocation(ptr, len, Arc::new(mmap)) }; + + let trailer_start = len - 10; + let footer_len = read_footer_length(buffer[trailer_start..].try_into().map_err(|_| { + DataFusionError::Execution(format!("feather sidecar {path}: bad IPC trailer")) + })?) + .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; + let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]) + .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; + + let schema: SchemaRef = Arc::new(fb_to_schema(footer.schema().ok_or_else(|| { + DataFusionError::Execution(format!("feather sidecar {path}: footer has no schema")) + })?)); + + let mut decoder = FileDecoder::new(schema.clone(), footer.version()); + for block in footer.dictionaries().iter().flatten() { + let block_len = block.bodyLength() as usize + block.metaDataLength() as usize; + let data = buffer.slice_with_length(block.offset() as usize, block_len); + decoder + .read_dictionary(block, &data) + .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; + } + + let mut batches = Vec::new(); + if let Some(record_batches) = footer.recordBatches() { + for block in record_batches { + let block_len = block.bodyLength() as usize + block.metaDataLength() as usize; + let data = buffer.slice_with_length(block.offset() as usize, block_len); + if let Some(batch) = decoder + .read_record_batch(block, &data) + .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))? + { + batches.push(batch); + } + } + } + Ok((schema, batches, buffer)) +} + +// ── Key helpers (u64 ordering domain — matches `extract_keys_as_u64`) ─────────── + +/// Read the key at `row` as `u64`. Supports the same integer key types as +/// `extract_keys_as_u64` (Int64/UInt64/Int32/UInt32). +fn key_at(col: &dyn Array, row: usize) -> DFResult { + let any = col.as_any(); + if let Some(a) = any.downcast_ref::() { + return Ok(a.value(row) as u64); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.value(row)); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.value(row) as u64); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.value(row) as u64); + } + Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column type {:?} is not supported; use Int64/UInt64/Int32/UInt32", + col.data_type() + ))) +} + +/// First index `i` in `col` whose key (compared as `u64`) is `>= target`. +/// Binary search over the sorted key column — touches only the pages it reads. +fn partition_point_u64(col: &dyn Array, target: u64) -> DFResult { + let any = col.as_any(); + if let Some(a) = any.downcast_ref::() { + return Ok(a.values().partition_point(|&v| (v as u64) < target)); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.values().partition_point(|&v| v < target)); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.values().partition_point(|&v| (v as u64) < target)); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.values().partition_point(|&v| (v as u64) < target)); + } + Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column type {:?} is not supported; use Int64/UInt64/Int32/UInt32", + col.data_type() + ))) +} + // ── Provider ────────────────────────────────────────────────────────────────── -/// In-memory, sorted-by-key Arrow positional [`PointLookupProvider`]. +/// mmap-backed, sorted-by-key Arrow positional [`PointLookupProvider`]. /// -/// Holds the full payload as one concatenated [`RecordBatch`] (sorted ascending -/// by the key column, which is field 0 of the schema) plus a contiguous -/// `Vec` of the key values for binary search. Built by -/// [`FeatherSidecarBuilder`] or opened from an existing `.feather` file with +/// Holds the payload as zero-copy [`RecordBatch`]es referencing the mmap, in +/// ascending-key order, plus a tiny first-key-per-batch coarse index. Built by +/// [`FeatherSidecarBuilder`] or opened from a `.feather` file with /// [`open`](Self::open). pub struct FeatherLookupProvider { schema: SchemaRef, - /// Payload rows, sorted ascending by `keys[i]`. Row `i` of every column - /// corresponds to `keys[i]`. - batch: RecordBatch, - /// Sorted ascending key values, one per row of `batch`. The contiguous - /// `rowid` index: `partition_point` over this maps a key → physical row. - keys: Arc>, + /// Payload batches, ascending by key and non-overlapping across batches. + /// Arrays reference the mmap (see `_mmap`). + batches: Vec, + /// Coarse index: first key of `batches[i]`, ascending. Narrows a lookup to a + /// single batch before the in-batch binary search. + batch_first_key: Vec, + /// Keeps the memory mapping alive; the batch arrays' buffers point into it. + _mmap: Buffer, key_col: String, + n_rows: usize, } impl fmt::Debug for FeatherLookupProvider { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "FeatherLookupProvider(key_col={}, rows={}, schema_cols={})", + "FeatherLookupProvider(key_col={}, rows={}, batches={}, schema_cols={})", self.key_col, - self.keys.len(), + self.n_rows, + self.batches.len(), self.schema.fields().len() ) } } impl FeatherLookupProvider { - /// Open an existing `.feather` (Arrow IPC file) sidecar. + /// Open an existing `.feather` (Arrow IPC file) sidecar via mmap. /// - /// Reads every batch, concatenates them into one resident batch, and lifts - /// the key column (field 0) into a contiguous `Vec`. The on-disk schema - /// is self-describing, so the provider's [`schema`](PointLookupProvider::schema) - /// is taken from the file verbatim. Fails if the stored key column is not - /// sorted ascending — `fetch_by_keys`' binary search depends on it, and the - /// builder always writes sorted, so an unsorted file signals corruption. + /// The on-disk schema is self-describing, so the provider's + /// [`schema`](PointLookupProvider::schema) is taken from the file. Fails if + /// the stored key column is not strictly ascending — the binary search + /// depends on it, and the builder always writes it sorted, so a violation + /// signals a corrupt or foreign file. pub fn open(path: &str) -> DFResult { - let file = std::fs::File::open(path) - .map_err(|e| DataFusionError::Execution(format!("open feather sidecar {path}: {e}")))?; - let reader = arrow_ipc::reader::FileReader::try_new(file, None) - .map_err(|e| DataFusionError::Execution(format!("read feather sidecar {path}: {e}")))?; - let schema = reader.schema(); - let batches = reader.collect::, _>>().map_err(|e| { - DataFusionError::Execution(format!("decode feather sidecar {path}: {e}")) - })?; - Self::from_batches(schema, batches) + let (schema, batches, mmap) = mmap_feather(path)?; + Self::from_parts(schema, batches, mmap) } - /// Build a provider from already-decoded, sorted-by-key batches. Shared by - /// [`open`](Self::open) and [`FeatherSidecarBuilder::finish`]. - fn from_batches(schema: SchemaRef, batches: Vec) -> DFResult { + fn from_parts(schema: SchemaRef, batches: Vec, mmap: Buffer) -> DFResult { if schema.fields().is_empty() { return Err(DataFusionError::Execution( "FeatherLookupProvider: schema has no columns; field 0 must be the key column" @@ -104,43 +223,67 @@ impl FeatherLookupProvider { } let key_col = schema.field(0).name().clone(); - // One contiguous batch so `take` addresses a global physical position. - let batch = if batches.is_empty() { - RecordBatch::new_empty(schema.clone()) - } else { - compute::concat_batches(&schema, &batches) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))? - }; - - let keys: Vec = key_column_as_u64(batch.column(0).as_ref())?; - - // Binary search requires strictly-ascending keys: ascending so - // `partition_point` is valid, and unique so a key maps to one physical - // row (mirroring SQLite's `INTEGER PRIMARY KEY`, which rejects duplicate - // keys at build). The builder guarantees this; verify defensively so a - // bad file fails loudly at open, not silently at query time with wrong - // rows. Compared in u64 order to match the build sort and the search. - if keys.windows(2).any(|w| w[0] >= w[1]) { - return Err(DataFusionError::Execution(format!( - "FeatherLookupProvider: key column '{key_col}' is not strictly ascending \ - (unsorted or duplicate keys); the sidecar is corrupt or was not built by \ - FeatherSidecarBuilder" - ))); + // Build the coarse index and verify the global ascending+unique invariant + // the binary search relies on (within each batch and across batches). + let mut batch_first_key: Vec = Vec::with_capacity(batches.len()); + let mut n_rows = 0usize; + let mut prev_last: Option = None; + let mut kept: Vec = Vec::with_capacity(batches.len()); + for batch in batches { + let nrows = batch.num_rows(); + if nrows == 0 { + continue; + } + let keycol = batch.column(0).as_ref(); + if keycol.null_count() > 0 { + return Err(DataFusionError::Execution( + "FeatherLookupProvider: key column has a null value; row keys must be non-null" + .into(), + )); + } + let first = key_at(keycol, 0)?; + // Strictly ascending within the batch. + let mut prev = first; + for r in 1..nrows { + let k = key_at(keycol, r)?; + if k <= prev { + return Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column '{key_col}' is not strictly ascending \ + (unsorted or duplicate keys); the sidecar is corrupt or was not built by \ + FeatherSidecarBuilder" + ))); + } + prev = k; + } + // Strictly ascending across the batch boundary. + if prev_last.is_some_and(|pl| first <= pl) { + return Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column '{key_col}' is not strictly ascending \ + across batches; the sidecar is corrupt or was not built by \ + FeatherSidecarBuilder" + ))); + } + prev_last = Some(prev); + batch_first_key.push(first); + n_rows += nrows; + kept.push(batch); } Ok(Self { schema, - batch, - keys: Arc::new(keys), + batches: kept, + batch_first_key, + _mmap: mmap, key_col, + n_rows, }) } pub fn len(&self) -> usize { - self.keys.len() + self.n_rows } pub fn is_empty(&self) -> bool { - self.keys.is_empty() + self.n_rows == 0 } } @@ -156,67 +299,93 @@ impl PointLookupProvider for FeatherLookupProvider { _key_col: &str, projection: Option<&[usize]>, ) -> DFResult> { - if keys.is_empty() { + if keys.is_empty() || self.batches.is_empty() { return Ok(vec![]); } - // Sort + dedup the requested keys. Sorting makes the resulting physical - // positions ascending, so the output is ordered by key — matching - // SQLite's `ORDER BY rowid`. Dedup matches `WHERE key IN (...)`, which - // returns one row per distinct key regardless of repeats. + // Validate the projection up front and gracefully (like + // `RecordBatch::project`), rather than panicking on an out-of-range index. + let nfields = self.schema.fields().len(); + let proj: Vec = match projection { + None => (0..nfields).collect(), + Some(idxs) => { + if let Some(&bad) = idxs.iter().find(|&&i| i >= nfields) { + return Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: projection index {bad} out of bounds for schema \ + with {nfields} columns" + ))); + } + idxs.to_vec() + } + }; + let out_schema: SchemaRef = Arc::new(Schema::new( + proj.iter() + .map(|&i| self.schema.field(i).clone()) + .collect::>(), + )); + + // Sort + dedup the requested keys: sorting yields ascending output + // (matching SQLite's `ORDER BY rowid`); dedup matches `WHERE key IN (...)`. let mut want = keys.to_vec(); want.sort_unstable(); want.dedup(); - // Map each requested key to a physical position via binary search, with - // an exact-match guard so an absent key is skipped rather than aliased - // onto its lower-bound neighbour. - let mut positions: Vec = Vec::with_capacity(want.len()); + // Resolve each present key to (batch index, local row), in ascending key + // order. The coarse index narrows to a batch; partition_point + exact + // match locate the row within it. + let mut matches: Vec<(usize, u32)> = Vec::with_capacity(want.len()); for &k in &want { - let pos = self.keys.partition_point(|&v| v < k); - if pos < self.keys.len() && self.keys[pos] == k { - positions.push(pos as u64); + let cb = self.batch_first_key.partition_point(|&fk| fk <= k); + if cb == 0 { + continue; // below the smallest stored key + } + let b = cb - 1; + let keycol = self.batches[b].column(0).as_ref(); + let pos = partition_point_u64(keycol, k)?; + if pos < self.batches[b].num_rows() && key_at(keycol, pos)? == k { + matches.push((b, pos as u32)); } } - if positions.is_empty() { + if matches.is_empty() { return Ok(vec![]); } - // Columns to read, in output order. Projection indexes into the provider - // schema (0 = key column), mirroring the SQLite provider's contract. - let col_indices: Vec = match projection { - None => (0..self.schema.fields().len()).collect(), - Some(idxs) => idxs.to_vec(), - }; - let out_schema: SchemaRef = match projection { - None => self.schema.clone(), - Some(idxs) => Arc::new(arrow_schema::Schema::new( - idxs.iter() - .map(|&i| self.schema.field(i).clone()) - .collect::>(), - )), - }; - - let pos_arr = UInt64Array::from(positions); - let cols: Vec = col_indices - .iter() - .map(|&i| { - compute::take(self.batch.column(i).as_ref(), &pos_arr, None) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) - }) - .collect::>()?; + // `matches` is grouped by batch (ascending k → non-decreasing batch idx). + // `take` each batch's projected columns once, then concat into one batch. + let mut out_batches: Vec = Vec::new(); + let mut j = 0; + while j < matches.len() { + let b = matches[j].0; + let mut locals: Vec = Vec::new(); + while j < matches.len() && matches[j].0 == b { + locals.push(matches[j].1); + j += 1; + } + let idx = UInt32Array::from(locals); + let cols: Vec = proj + .iter() + .map(|&c| { + compute::take(self.batches[b].column(c).as_ref(), &idx, None) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + }) + .collect::>()?; + out_batches.push( + RecordBatch::try_new(out_schema.clone(), cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?, + ); + } - let batch = RecordBatch::try_new(out_schema, cols) + let combined = compute::concat_batches(&out_schema, &out_batches) .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; - Ok(vec![batch]) + Ok(vec![combined]) } } // ── TableProvider ───────────────────────────────────────────────────────────── // -// Mirrors `SqliteLookupProvider` / `HashKeyProvider`: the payload is already -// resident, so a full scan is a cheap MemTable over the single batch. This lets -// DataFusion resolve column names when the provider is registered as a table. +// The payload is already addressable (mmap'd batches), so a full scan is a cheap +// MemTable over them. Lets DataFusion resolve column names when the provider is +// registered as a table. #[async_trait] impl TableProvider for FeatherLookupProvider { @@ -236,59 +405,50 @@ impl TableProvider for FeatherLookupProvider { _filters: &[Expr], _limit: Option, ) -> DFResult> { - let mem = MemTable::try_new(self.schema.clone(), vec![vec![self.batch.clone()]])?; + let mem = MemTable::try_new(self.schema.clone(), vec![self.batches.clone()])?; mem.scan(state, projection, &[], None).await } } -// ── Streaming sidecar builder ─────────────────────────────────────────────── +// ── Streaming sidecar builder (bounded-memory external sort) ───────────────────── /// Incremental builder for a [`FeatherLookupProvider`], the Feather analogue of /// [`SqliteSidecarBuilder`](crate::sqlite_provider::SqliteSidecarBuilder). /// /// Takes input [`RecordBatch`]es one at a time (e.g. from the DuckLake -/// snapshot-pinned, row-lineage scan), reading each row's key from a designated -/// column and projecting the value columns into the output schema. On -/// [`finish`](Self::finish) the buffered rows are sorted ascending by key and -/// written as one Arrow IPC batch. -/// -/// **Sort-agnostic by design.** Unlike the SQLite B-tree (which is order -/// independent), Feather binary search requires the on-disk key column sorted. -/// Rather than depend on the DuckLake scan emitting sorted rowids — the ticket's -/// top open risk — this builder sorts at `finish`, so correctness holds for any -/// input order. Whether the input was *already* sorted is tracked and reported -/// via [`input_was_sorted`](Self::input_was_sorted), feeding the decision on -/// whether a bounded-memory merge (instead of the in-memory sort) is needed at -/// production scale. +/// snapshot-pinned, row-lineage scan), reads each row's key from a designated +/// column, and projects the value columns into the output schema. /// -/// **Memory:** the builder buffers all projected rows before sorting — O(N) -/// resident. A bounded-memory external merge-of-sorted-runs is the scale plan; -/// it is not implemented here. +/// **Bounded memory, order-agnostic.** Feather binary search needs the on-disk +/// key column sorted, but — unlike SQLite's order-independent B-tree — the input +/// scan's row order is not guaranteed (DuckLake `rowid` order from the scan is +/// not reliable). Rather than buffer the whole payload and sort it (O(N) +/// resident) or depend on the scan emitting sorted rowids, this builder spills: +/// [`push_batch`](Self::push_batch) writes rows UNSORTED to a temp IPC file +/// (O(one batch) payload resident), keeping only an O(N)·8 B key index in memory; +/// [`finish`](Self::finish) argsorts the keys and gathers the rows in sorted +/// order from the mmap'd temp via `interleave`, in bounded chunks, into the final +/// sorted file. Duplicate keys are rejected (mirroring SQLite's primary key). /// -/// The first field of `schema` is the key column; fields 1.. are the stored -/// value columns. `key_col_index` / `value_col_indices` index into the *input* -/// batches passed to [`push_batch`](Self::push_batch) (matching -/// [`SqliteSidecarBuilder::begin`](crate::sqlite_provider::SqliteSidecarBuilder::begin)). +/// The first field of `schema` is the key column; fields 1.. are the stored value +/// columns. `key_col_index` / `value_col_indices` index the *input* batches. pub struct FeatherSidecarBuilder { - path: String, + final_path: String, schema: SchemaRef, key_col_index: usize, value_col_indices: Vec, - /// Projected batches conforming to `schema`, accumulated across push_batch. - buffered: Vec, - /// Largest key seen so far, to detect whether the input stream is already - /// globally sorted ascending. + /// Unsorted spill file; rows are appended in push order. Auto-removed on drop. + temp: tempfile::NamedTempFile, + writer: arrow_ipc::writer::FileWriter, + /// Key of each row, in push (spill) order — the only O(N) in-memory state. + keys: Vec, last_key: Option, input_sorted: bool, } impl FeatherSidecarBuilder { - /// Begin a build targeting `path` (the output `.feather` file). - /// - /// `schema` is the output schema — field 0 is the key column, fields 1.. are - /// the stored value columns, verbatim Arrow (no type validation: storing - /// types SQLite rejects is the point). `key_col_index` and - /// `value_col_indices` index into the input batches. + /// Begin a build targeting `path` (the output `.feather` file). Opens the + /// temp spill file immediately. pub fn begin( path: &str, schema: SchemaRef, @@ -308,21 +468,28 @@ impl FeatherSidecarBuilder { value_col_indices.len() ))); } + let temp = tempfile::NamedTempFile::new() + .map_err(|e| DataFusionError::Execution(format!("create feather spill file: {e}")))?; + let spill = temp + .reopen() + .map_err(|e| DataFusionError::Execution(format!("open feather spill file: {e}")))?; + let writer = arrow_ipc::writer::FileWriter::try_new(spill, &schema) + .map_err(|e| DataFusionError::Execution(format!("init feather spill writer: {e}")))?; Ok(Self { - path: path.to_string(), + final_path: path.to_string(), schema, key_col_index, value_col_indices, - buffered: Vec::new(), + temp, + writer, + keys: Vec::new(), last_key: None, input_sorted: true, }) } - /// Project and buffer every row of `batch`. The key column is read from - /// `key_col_index`; value columns from `value_col_indices`, in order, into - /// schema fields 1.. . Type mismatches between the input columns and the - /// declared schema surface here (via `RecordBatch::try_new`). + /// Project and spill every row of `batch` (unsorted), recording each row's + /// key. Peak memory is O(one batch) plus the running key index. pub fn push_batch(&mut self, batch: &RecordBatch) -> DFResult<()> { let ncols = batch.num_columns(); if self.key_col_index >= ncols { @@ -344,17 +511,15 @@ impl FeatherSidecarBuilder { .into(), )); } - - // Track global sortedness while we have the keys in hand. - let keys = extract_keys_as_u64(key_col.as_ref())?; - for k in keys.into_iter().flatten() { + for k in extract_keys_as_u64(key_col.as_ref())?.into_iter().flatten() { if self.last_key.is_some_and(|prev| k < prev) { self.input_sorted = false; } self.last_key = Some(k); + self.keys.push(k); } - // Project input columns into output-schema order: key first, then values. + // Project input columns to output-schema order: key first, then values. let mut cols: Vec = Vec::with_capacity(self.value_col_indices.len() + 1); cols.push(batch.column(self.key_col_index).clone()); for &ci in &self.value_col_indices { @@ -362,108 +527,99 @@ impl FeatherSidecarBuilder { } let projected = RecordBatch::try_new(self.schema.clone(), cols) .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; - self.buffered.push(projected); + self.writer + .write(&projected) + .map_err(|e| DataFusionError::Execution(format!("spill feather batch: {e}")))?; Ok(()) } - /// Whether every row pushed so far arrived in ascending key order (i.e. the - /// in-memory sort at `finish` was a no-op). Informational: feeds the - /// production decision on whether the DuckLake scan can be relied on to emit - /// sorted rowids (skipping the sort) or needs a merge step. + /// Whether every row arrived in ascending key order (i.e. the spill was + /// already sorted). Informational. pub fn input_was_sorted(&self) -> bool { self.input_sorted } - /// Sort buffered rows ascending by key, write them as a single Arrow IPC - /// batch to `path`, and open a [`FeatherLookupProvider`] over the result. - pub fn finish(self) -> DFResult { - let combined = if self.buffered.is_empty() { - RecordBatch::new_empty(self.schema.clone()) - } else { - compute::concat_batches(&self.schema, &self.buffered) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))? - }; + /// Sort by key and write the final sorted `.feather`, then open a provider + /// over it. Bounded memory: argsort the key index, then gather rows from the + /// mmap'd spill via `interleave` in `BUILD_CHUNK_ROWS` chunks. + pub fn finish(mut self) -> DFResult { + self.writer + .finish() + .map_err(|e| DataFusionError::Execution(format!("finalize feather spill: {e}")))?; + + let n = self.keys.len(); + // Stable argsort of the keys (u64 domain), then reject any duplicate — + // strictly-ascending is the provider's invariant. + let mut order: Vec = (0..n as u32).collect(); + order.sort_by_key(|&i| self.keys[i as usize]); + for w in order.windows(2) { + if self.keys[w[0] as usize] >= self.keys[w[1] as usize] { + return Err(DataFusionError::Execution( + "FeatherSidecarBuilder: duplicate row key; keys must be unique".into(), + )); + } + } - // Sort ascending by the key column (field 0), in the SAME u64 domain the - // lookup uses — `extract_keys_as_u64` casts the key `as u64`, and - // `fetch_by_keys`/`from_batches` binary-search and verify in u64 order. A - // raw `sort_to_indices` would instead order by the column's native (for - // Int64/Int32: signed) order, which disagrees with the u64 search for - // keys whose i64 and u64 orderings differ (e.g. negative Int64 values), - // making `from_batches` reject a genuinely-ascending build as "corrupt". - // Argsorting the u64-cast keys keeps the on-disk order, the open()-time - // verification, and the search consistent for every supported key type. - let sorted = if combined.num_rows() == 0 { - combined - } else { - let key_vals = key_column_as_u64(combined.column(0).as_ref())?; - let mut order: Vec = (0..key_vals.len() as u32).collect(); - order.sort_by_key(|&i| key_vals[i as usize]); - let indices = UInt32Array::from(order); - sort_batch(&combined, &indices)? - }; + // mmap the unsorted spill; map global row position → (batch, local row). + let temp_path = self + .temp + .path() + .to_str() + .ok_or_else(|| DataFusionError::Execution("feather spill path is not UTF-8".into()))? + .to_string(); + let (_tschema, tbatches, _tmmap) = mmap_feather(&temp_path)?; + let mut offsets: Vec = Vec::with_capacity(tbatches.len() + 1); + let mut acc = 0usize; + offsets.push(0); + for b in &tbatches { + acc += b.num_rows(); + offsets.push(acc); + } - write_ipc_file(&self.path, &sorted)?; + let final_file = File::create(&self.final_path).map_err(|e| { + DataFusionError::Execution(format!("create feather sidecar {}: {e}", self.final_path)) + })?; + let mut out = arrow_ipc::writer::FileWriter::try_new(final_file, &self.schema) + .map_err(|e| DataFusionError::Execution(format!("init feather writer: {e}")))?; + + let ncols = self.schema.fields().len(); + let mut start = 0usize; + while start < n { + let end = (start + BUILD_CHUNK_ROWS).min(n); + // (batch, local) pairs for this chunk, in sorted-key order. + let pairs: Vec<(usize, usize)> = order[start..end] + .iter() + .map(|&gp| { + let gp = gp as usize; + let b = offsets.partition_point(|&o| o <= gp) - 1; + (b, gp - offsets[b]) + }) + .collect(); + let cols: Vec = (0..ncols) + .map(|c| { + let arrays: Vec<&dyn Array> = + tbatches.iter().map(|b| b.column(c).as_ref()).collect(); + compute::interleave(&arrays, &pairs) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + }) + .collect::>()?; + let batch = RecordBatch::try_new(self.schema.clone(), cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + out.write(&batch) + .map_err(|e| DataFusionError::Execution(format!("write feather batch: {e}")))?; + start = end; + } + out.finish() + .map_err(|e| DataFusionError::Execution(format!("finalize feather sidecar: {e}")))?; tracing::info!( "Feather sidecar '{}' built: {} rows, input_already_sorted={}.", - self.path, - sorted.num_rows(), + self.final_path, + n, self.input_sorted, ); - FeatherLookupProvider::from_batches(self.schema, vec![sorted]) - } -} - -/// Lift the key column into a contiguous `Vec`, erroring on a null key (row -/// keys must be non-null). The `as u64` cast inside `extract_keys_as_u64` defines -/// the single key-ordering domain this provider uses everywhere — the build sort, -/// the open-time sortedness check, and the `fetch_by_keys` binary search. -fn key_column_as_u64(col: &dyn Array) -> DFResult> { - extract_keys_as_u64(col)? - .into_iter() - .map(|k| { - k.ok_or_else(|| { - DataFusionError::Execution( - "FeatherLookupProvider: key column has a null value; \ - row keys must be non-null" - .into(), - ) - }) - }) - .collect() -} - -/// `take` every column of `batch` by `indices`, preserving the schema. -fn sort_batch(batch: &RecordBatch, indices: &UInt32Array) -> DFResult { - let cols: Vec = batch - .columns() - .iter() - .map(|c| { - compute::take(c.as_ref(), indices, None) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) - }) - .collect::>()?; - RecordBatch::try_new(batch.schema(), cols) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) -} - -/// Write a single batch as an Arrow IPC *file* (Feather v2). Uncompressed: the -/// fast path needs on-disk bytes == in-memory layout (zero-copy take), and IPC -/// whole-buffer compression would force a full-column decode per scattered row. -fn write_ipc_file(path: &str, batch: &RecordBatch) -> DFResult<()> { - let file = std::fs::File::create(path) - .map_err(|e| DataFusionError::Execution(format!("create feather sidecar {path}: {e}")))?; - let mut writer = arrow_ipc::writer::FileWriter::try_new(file, &batch.schema()) - .map_err(|e| DataFusionError::Execution(format!("init feather writer {path}: {e}")))?; - if batch.num_rows() > 0 { - writer - .write(batch) - .map_err(|e| DataFusionError::Execution(format!("write feather batch {path}: {e}")))?; + // `self.temp` (the spill) is removed when it drops at end of scope. + FeatherLookupProvider::open(&self.final_path) } - writer - .finish() - .map_err(|e| DataFusionError::Execution(format!("finalize feather sidecar {path}: {e}")))?; - Ok(()) } diff --git a/tests/feather_provider_test.rs b/tests/feather_provider_test.rs index 188d364..0a09b8b 100644 --- a/tests/feather_provider_test.rs +++ b/tests/feather_provider_test.rs @@ -805,3 +805,59 @@ async fn table_provider_scan_roundtrips() { "projected scan returns only 2 columns" ); } + +// ── Multi-batch (spill-sort + coarse-index) ───────────────────────────────────── + +/// Build > BUILD_CHUNK_ROWS rows from fully-unsorted input so the spill-sort runs +/// and the final file is multi-batch, then check fetch parity vs SQLite — with +/// keys straddling the internal batch boundary to exercise the coarse index + +/// cross-batch gather. +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn parity_multibatch_shuffled_build() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(20_000, 1024, 0xABCD); + + // Feather: push batches reversed + each batch's rows reversed → spill is fully + // unsorted, forcing the external sort to do real work. + let fpath = dir.path().join("mb.feather"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut fb = + FeatherSidecarBuilder::begin(fpath.to_str().unwrap(), schema.clone(), 0, value_cols) + .unwrap(); + for b in batches.iter().rev() { + fb.push_batch(&reverse_rows(b)).unwrap(); + } + assert!(!fb.input_was_sorted(), "reversed input should report unsorted"); + let feather = fb.finish().unwrap(); + assert_eq!(feather.len(), rowids.len()); + + // SQLite oracle (order-independent B-tree). + let sqlite = build_sqlite(&dir, schema.clone(), &batches); + + let n = rowids.len(); + let mut rng = Rng(7); + let mut keys: Vec = (0..1500) + .map(|_| rowids[rng.below(n as u64) as usize] as u64) + .collect(); + // Straddle the first internal batch boundary (sorted chunk size 8192). + for &i in &[0usize, 8191, 8192, 8193, n - 1] { + keys.push(rowids[i] as u64); + } + + let f = feather.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + let s = sqlite.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + assert_eq!(fmt(&f), fmt(&s), "multi-batch shuffled-build parity mismatch"); + + // Projected fetch across the boundary too. + let proj = vec![0usize, 3, 6]; + let f = feather + .fetch_by_keys(&keys, "rowid", Some(&proj)) + .await + .unwrap(); + let s = sqlite + .fetch_by_keys(&keys, "rowid", Some(&proj)) + .await + .unwrap(); + assert_eq!(fmt(&f), fmt(&s), "multi-batch projected parity mismatch"); +} From 0266253c08e997f46251ff46e53a0bff5c85d345 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 17 Jun 2026 17:13:03 +0530 Subject: [PATCH 5/5] fix(feather): guard corrupt footer; atomic publish Address the mmap/spill-rewrite review: - open(): guard the footer length against the file size and range-check every IPC block offset/length before slicing, so a corrupt/foreign file returns a typed error instead of panicking (no-panic-on-open contract). Must-fix. - finish(): publish via write-to-sibling-temp + atomic rename instead of truncating the final path in place, so a concurrent reader's live mmap never observes a truncated inode (SIGBUS/torn reads); also crash-safe. - finish(): use usize sort indices (drops the >4.29B-row u32 truncation). - Docs: correct the build-memory wording (u64 keys + usize permutation, not payload), the read-path heap-vs-page-in nuance, and note the single-dictionary constraint for Dictionary payload columns. - Tests: corrupt-footer rejection, empty/single-row builds, exact chunk-boundary gather, and cross-push duplicate-key rejection. --- src/feather_provider.rs | 92 ++++++++++++++++----- tests/feather_provider_test.rs | 142 ++++++++++++++++++++++++++++++++- 2 files changed, 212 insertions(+), 22 deletions(-) diff --git a/src/feather_provider.rs b/src/feather_provider.rs index 34521dd..5a8344c 100644 --- a/src/feather_provider.rs +++ b/src/feather_provider.rs @@ -16,19 +16,26 @@ // (`FileDecoder` over a `Buffer` backed by the mapping), so column buffers // point into the mapped file. `take` over scattered keys faults in only the // touched pages; only the small K-row output is heap-allocated. Resident -// payload memory is bounded by the working set, not the file size. +// *heap* is bounded by the working set, not the file size (open() does page +// in the key column once to validate ordering, but those are file-backed, +// reclaimable pages, not heap). // // * BUILD: a bounded-memory external (spill) sort. Rows stream UNSORTED to a -// temp IPC file (O(one batch) payload resident); only an O(N)·8 B key index -// is kept in memory. At `finish` the keys are argsorted and the rows are -// gathered in sorted order from the mmap'd temp via `interleave`, in bounded -// chunks, into the final sorted file. No full-payload buffering, and no -// assumption about the input scan's row order. +// temp IPC file (O(one batch) payload resident); the only O(N) in-memory +// state is the key index — `u64` keys plus, at `finish`, a `usize` sort +// permutation (key-index bytes, not payload). At `finish` the keys are +// argsorted and the rows are gathered in sorted order from the mmap'd temp +// via `interleave`, in bounded chunks, into a temp file that is atomically +// renamed into place. No full-payload buffering, and no assumption about the +// input scan's row order. // // Because the payload is stored verbatim Arrow, there is zero type conversion: // Decimal / Struct / Map / FixedSizeList / Dictionary / Timestamp(tz) / nested // List all round-trip losslessly (unlike the SQLite provider, which rejects or -// coerces them). +// coerces them). One caveat for `Dictionary` payload columns: the IPC writer +// requires a consistent dictionary across the spilled batches, so a build whose +// input batches carry *different* dictionaries for the same column is rejected +// at build time rather than silently corrupted. use std::any::Any; use std::fmt; @@ -88,6 +95,14 @@ fn mmap_feather(path: &str) -> DFResult<(SchemaRef, Vec, Buffer)> { DataFusionError::Execution(format!("feather sidecar {path}: bad IPC trailer")) })?) .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; + // `read_footer_length` only validates the trailing magic, not that the footer + // fits in the file. Guard the subtraction so a corrupt length returns an error + // instead of underflowing/panicking on the slice. + if footer_len > trailer_start { + return Err(DataFusionError::Execution(format!( + "feather sidecar {path}: footer length {footer_len} exceeds file size" + ))); + } let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]) .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; @@ -95,10 +110,27 @@ fn mmap_feather(path: &str) -> DFResult<(SchemaRef, Vec, Buffer)> { DataFusionError::Execution(format!("feather sidecar {path}: footer has no schema")) })?)); + // A block's [offset, offset+meta+body) must lie within the file; otherwise + // `Buffer::slice_with_length` panics. Validate before slicing so corrupt + // offsets surface as a typed error. + let block_range = |offset: i64, meta: i32, body: i64| -> DFResult<(usize, usize)> { + let block_len = i64::from(meta).checked_add(body).filter(|&l| l >= 0); + let end = block_len.and_then(|l| offset.checked_add(l)); + match (offset >= 0, block_len, end) { + (true, Some(l), Some(e)) if (e as u64) <= len as u64 => { + Ok((offset as usize, l as usize)) + } + _ => Err(DataFusionError::Execution(format!( + "feather sidecar {path}: corrupt IPC block offset/length" + ))), + } + }; + let mut decoder = FileDecoder::new(schema.clone(), footer.version()); for block in footer.dictionaries().iter().flatten() { - let block_len = block.bodyLength() as usize + block.metaDataLength() as usize; - let data = buffer.slice_with_length(block.offset() as usize, block_len); + let (off, block_len) = + block_range(block.offset(), block.metaDataLength(), block.bodyLength())?; + let data = buffer.slice_with_length(off, block_len); decoder .read_dictionary(block, &data) .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; @@ -107,8 +139,9 @@ fn mmap_feather(path: &str) -> DFResult<(SchemaRef, Vec, Buffer)> { let mut batches = Vec::new(); if let Some(record_batches) = footer.recordBatches() { for block in record_batches { - let block_len = block.bodyLength() as usize + block.metaDataLength() as usize; - let data = buffer.slice_with_length(block.offset() as usize, block_len); + let (off, block_len) = + block_range(block.offset(), block.metaDataLength(), block.bodyLength())?; + let data = buffer.slice_with_length(off, block_len); if let Some(batch) = decoder .read_record_batch(block, &data) .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))? @@ -549,11 +582,12 @@ impl FeatherSidecarBuilder { let n = self.keys.len(); // Stable argsort of the keys (u64 domain), then reject any duplicate — - // strictly-ascending is the provider's invariant. - let mut order: Vec = (0..n as u32).collect(); - order.sort_by_key(|&i| self.keys[i as usize]); + // strictly-ascending is the provider's invariant. `usize` indices avoid a + // truncation ceiling on the row count. + let mut order: Vec = (0..n).collect(); + order.sort_by_key(|&i| self.keys[i]); for w in order.windows(2) { - if self.keys[w[0] as usize] >= self.keys[w[1] as usize] { + if self.keys[w[0]] >= self.keys[w[1]] { return Err(DataFusionError::Execution( "FeatherSidecarBuilder: duplicate row key; keys must be unique".into(), )); @@ -576,10 +610,24 @@ impl FeatherSidecarBuilder { offsets.push(acc); } - let final_file = File::create(&self.final_path).map_err(|e| { - DataFusionError::Execution(format!("create feather sidecar {}: {e}", self.final_path)) - })?; - let mut out = arrow_ipc::writer::FileWriter::try_new(final_file, &self.schema) + // Write to a sibling temp file and atomically rename into place, so a + // concurrent reader's live mmap of an existing sidecar keeps pointing at + // the old (now-unlinked) inode rather than observing a truncated file + // (SIGBUS / torn reads). Also gives crash-safety: a half-written build + // never leaves a corrupt sidecar at `final_path`. + let final_dir = std::path::Path::new(&self.final_path) + .parent() + .filter(|p| !p.as_os_str().is_empty()) + .map(std::path::Path::to_path_buf) + .unwrap_or_else(|| std::path::PathBuf::from(".")); + let out_tmp = tempfile::Builder::new() + .prefix(".feather-build-") + .tempfile_in(&final_dir) + .map_err(|e| DataFusionError::Execution(format!("create feather output temp: {e}")))?; + let out_handle = out_tmp + .reopen() + .map_err(|e| DataFusionError::Execution(format!("open feather output temp: {e}")))?; + let mut out = arrow_ipc::writer::FileWriter::try_new(out_handle, &self.schema) .map_err(|e| DataFusionError::Execution(format!("init feather writer: {e}")))?; let ncols = self.schema.fields().len(); @@ -590,7 +638,6 @@ impl FeatherSidecarBuilder { let pairs: Vec<(usize, usize)> = order[start..end] .iter() .map(|&gp| { - let gp = gp as usize; let b = offsets.partition_point(|&o| o <= gp) - 1; (b, gp - offsets[b]) }) @@ -612,6 +659,11 @@ impl FeatherSidecarBuilder { out.finish() .map_err(|e| DataFusionError::Execution(format!("finalize feather sidecar: {e}")))?; + // Atomic publish: rename the completed temp over the final path. + out_tmp.persist(&self.final_path).map_err(|e| { + DataFusionError::Execution(format!("publish feather sidecar {}: {e}", self.final_path)) + })?; + tracing::info!( "Feather sidecar '{}' built: {} rows, input_already_sorted={}.", self.final_path, diff --git a/tests/feather_provider_test.rs b/tests/feather_provider_test.rs index 0a09b8b..4b193ce 100644 --- a/tests/feather_provider_test.rs +++ b/tests/feather_provider_test.rs @@ -828,7 +828,10 @@ async fn parity_multibatch_shuffled_build() { for b in batches.iter().rev() { fb.push_batch(&reverse_rows(b)).unwrap(); } - assert!(!fb.input_was_sorted(), "reversed input should report unsorted"); + assert!( + !fb.input_was_sorted(), + "reversed input should report unsorted" + ); let feather = fb.finish().unwrap(); assert_eq!(feather.len(), rowids.len()); @@ -847,7 +850,11 @@ async fn parity_multibatch_shuffled_build() { let f = feather.fetch_by_keys(&keys, "rowid", None).await.unwrap(); let s = sqlite.fetch_by_keys(&keys, "rowid", None).await.unwrap(); - assert_eq!(fmt(&f), fmt(&s), "multi-batch shuffled-build parity mismatch"); + assert_eq!( + fmt(&f), + fmt(&s), + "multi-batch shuffled-build parity mismatch" + ); // Projected fetch across the boundary too. let proj = vec![0usize, 3, 6]; @@ -861,3 +868,134 @@ async fn parity_multibatch_shuffled_build() { .unwrap(); assert_eq!(fmt(&f), fmt(&s), "multi-batch projected parity mismatch"); } + +// ── Regression + edge cases for the mmap + spill-sort rewrite ──────────────────── + +/// `open` must return an error (not panic) on a file whose IPC trailer claims a +/// footer larger than the file — the corrupt-footer guard. +#[test] +fn open_rejects_corrupt_footer_length() { + let dir = tempdir().unwrap(); + let p = dir.path().join("badfooter.feather"); + // 32 bytes: padding, then a valid 10-byte trailer [footer_len i32 LE][b"ARROW1"] + // with an oversized footer_len. + let mut bytes = vec![0u8; 22]; + bytes.extend_from_slice(&1_000_000_i32.to_le_bytes()); + bytes.extend_from_slice(b"ARROW1"); + std::fs::write(&p, &bytes).unwrap(); + assert!( + FeatherLookupProvider::open(p.to_str().unwrap()).is_err(), + "oversized footer_len must error, not panic" + ); +} + +/// A build with zero pushed rows yields a valid, empty, re-openable sidecar. +#[tokio::test] +async fn empty_build_is_valid() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let p = dir.path().join("empty.feather"); + let builder = FeatherSidecarBuilder::begin(p.to_str().unwrap(), schema, 0, vec![1]).unwrap(); + let provider = builder.finish().unwrap(); + assert!(provider.is_empty()); + assert!( + provider + .fetch_by_keys(&[1, 2, 3], "rowid", None) + .await + .unwrap() + .is_empty() + ); + let reopened = FeatherLookupProvider::open(p.to_str().unwrap()).unwrap(); + assert_eq!(reopened.len(), 0); +} + +/// Single-row build (smallest non-empty case). +#[tokio::test] +async fn single_row_build() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![42_i64])), + Arc::new(StringArray::from(vec![Some("answer")])), + ], + ) + .unwrap(); + let p = dir.path().join("one.feather"); + let mut b = FeatherSidecarBuilder::begin(p.to_str().unwrap(), schema, 0, vec![1]).unwrap(); + b.push_batch(&batch).unwrap(); + let provider = b.finish().unwrap(); + assert_eq!(provider.len(), 1); + let out = provider.fetch_by_keys(&[42], "rowid", None).await.unwrap(); + assert_eq!(out[0].num_rows(), 1); + assert!( + provider + .fetch_by_keys(&[7], "rowid", None) + .await + .unwrap() + .is_empty() + ); +} + +/// Build exactly 2× BUILD_CHUNK_ROWS (16384) rows so the gather hits an exact +/// chunk boundary, and check parity vs SQLite for keys straddling it. +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn build_at_exact_chunk_boundary() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(16_384, 4096, 0xBEEF); + let feather = build_feather(&dir, schema.clone(), &batches); + let sqlite = build_sqlite(&dir, schema.clone(), &batches); + assert_eq!(feather.len(), 16_384); + + let mut keys: Vec = [0usize, 8191, 8192, 8193, 16_383] + .iter() + .map(|&i| rowids[i] as u64) + .collect(); + let mut rng = Rng(1); + for _ in 0..500 { + keys.push(rowids[rng.below(16_384) as usize] as u64); + } + let f = feather.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + let s = sqlite.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + assert_eq!(fmt(&f), fmt(&s), "chunk-boundary parity mismatch"); +} + +/// Duplicate keys split across separate push_batch calls must be rejected at +/// finish (the dup check runs over the global sorted order, not per-batch). +#[test] +fn duplicate_keys_across_pushes_rejected() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let mk = |rid: Vec, names: Vec<&'static str>| { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(rid)), + Arc::new(StringArray::from( + names.into_iter().map(Some).collect::>(), + )), + ], + ) + .unwrap() + }; + let p = dir.path().join("dup.feather"); + let mut b = + FeatherSidecarBuilder::begin(p.to_str().unwrap(), schema.clone(), 0, vec![1]).unwrap(); + b.push_batch(&mk(vec![1, 2], vec!["a", "b"])).unwrap(); + b.push_batch(&mk(vec![2, 3], vec!["c", "d"])).unwrap(); // key 2 duplicated across pushes + assert!( + b.finish().is_err(), + "duplicate key across pushes must be rejected" + ); +}