diff --git a/Cargo.lock b/Cargo.lock index c850dbb..5aec148 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1405,11 +1405,13 @@ name = "datafusion-vector-search-ext" version = "0.1.0" dependencies = [ "arrow-array", + "arrow-ipc", "arrow-schema", "async-trait", "bytes", "datafusion", "futures", + "memmap2", "object_store", "parquet", "rusqlite", @@ -2112,6 +2114,15 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "memmap2" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714098028fe011992e1c3962653c96b2d578c4b4bce9036e15ff220319b1e0e3" +dependencies = [ + "libc", +] + [[package]] name = "miniz_oxide" version = "0.8.9" diff --git a/Cargo.toml b/Cargo.toml index 46962d8..363e913 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ license = "MIT OR Apache-2.0" [features] parquet-provider = ["dep:parquet", "dep:object_store", "dep:bytes"] sqlite-provider = ["dep:rusqlite", "dep:serde_json", "dep:parquet"] +feather-provider = ["dep:arrow-ipc", "dep:memmap2", "dep:tempfile"] [dependencies] tracing = "0.1" @@ -29,6 +30,14 @@ bytes = { version = "1", optional = true } rusqlite = { version = "0.32", optional = true, features = ["bundled"] } serde_json = { version = "1", optional = true } +# feather-provider — Arrow IPC (Feather v2). `take`/`concat_batches`/`interleave` +# come from `datafusion::arrow::compute`; the extra deps are the IPC reader/writer, +# `memmap2` for the zero-copy mmap read path, and `tempfile` for the bounded-memory +# (spill-sort) build path. +arrow-ipc = { version = "58", optional = true } +memmap2 = { version = "0.9", optional = true } +tempfile = { version = "3", optional = true } + [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tempfile = "3" diff --git a/src/feather_provider.rs b/src/feather_provider.rs new file mode 100644 index 0000000..5a8344c --- /dev/null +++ b/src/feather_provider.rs @@ -0,0 +1,677 @@ +// feather_provider.rs — Arrow/Feather (Arrow IPC) positional PointLookupProvider. +// +// A drop-in alternative to `SqliteLookupProvider` for the DuckLake vector-search +// payload sidecar. The payload is stored as an Arrow IPC file sorted ascending by +// the (sparse / holey) `rowid` key. Hydration is: +// +// 1. a coarse per-batch first-key index narrows to the batch that may hold a +// requested key, +// 2. `slice::partition_point` binary-searches that batch's rowid column, +// 3. an exact-match guard (`rowid[pos] == key`) rejects a missing rowid, then +// 4. `take` pulls the matched physical rows out of the payload columns. +// +// Memory parity with SQLite (which pages its B-tree) comes from two pieces: +// +// * READ: the file is `mmap`'d and the Arrow arrays are decoded ZERO-COPY +// (`FileDecoder` over a `Buffer` backed by the mapping), so column buffers +// point into the mapped file. `take` over scattered keys faults in only the +// touched pages; only the small K-row output is heap-allocated. Resident +// *heap* is bounded by the working set, not the file size (open() does page +// in the key column once to validate ordering, but those are file-backed, +// reclaimable pages, not heap). +// +// * BUILD: a bounded-memory external (spill) sort. Rows stream UNSORTED to a +// temp IPC file (O(one batch) payload resident); the only O(N) in-memory +// state is the key index — `u64` keys plus, at `finish`, a `usize` sort +// permutation (key-index bytes, not payload). At `finish` the keys are +// argsorted and the rows are gathered in sorted order from the mmap'd temp +// via `interleave`, in bounded chunks, into a temp file that is atomically +// renamed into place. No full-payload buffering, and no assumption about the +// input scan's row order. +// +// Because the payload is stored verbatim Arrow, there is zero type conversion: +// Decimal / Struct / Map / FixedSizeList / Dictionary / Timestamp(tz) / nested +// List all round-trip losslessly (unlike the SQLite provider, which rejects or +// coerces them). One caveat for `Dictionary` payload columns: the IPC writer +// requires a consistent dictionary across the spilled batches, so a build whose +// input batches carry *different* dictionaries for the same column is rejected +// at build time rather than silently corrupted. + +use std::any::Any; +use std::fmt; +use std::fs::File; +use std::ptr::NonNull; +use std::sync::Arc; + +use arrow_array::{Array, ArrayRef, Int32Array, Int64Array, RecordBatch, UInt32Array, UInt64Array}; +use arrow_schema::{Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::arrow::buffer::Buffer; +use datafusion::arrow::compute; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::Result as DFResult; +use datafusion::datasource::MemTable; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::{Expr, TableType}; +use datafusion::physical_plan::ExecutionPlan; + +use crate::lookup::{PointLookupProvider, extract_keys_as_u64}; + +/// Rows per output batch when gathering the sorted result at build time. Bounds +/// the per-chunk `interleave` working set. +const BUILD_CHUNK_ROWS: usize = 8192; + +// ── mmap + zero-copy IPC decode ───────────────────────────────────────────────── + +/// Memory-map `path` and decode every Arrow IPC record batch ZERO-COPY: the +/// returned arrays' buffers point into the mapping (kept alive by the returned +/// [`Buffer`]'s custom allocation owner), so nothing is copied onto the heap. +fn mmap_feather(path: &str) -> DFResult<(SchemaRef, Vec, Buffer)> { + use arrow_ipc::convert::fb_to_schema; + use arrow_ipc::reader::{FileDecoder, read_footer_length}; + use arrow_ipc::root_as_footer; + + let file = File::open(path) + .map_err(|e| DataFusionError::Execution(format!("open feather sidecar {path}: {e}")))?; + // SAFETY: the file is treated as immutable for the provider's lifetime (built + // once, then read-only). The mapping is owned by the `Buffer` below. + let mmap = unsafe { memmap2::Mmap::map(&file) } + .map_err(|e| DataFusionError::Execution(format!("mmap feather sidecar {path}: {e}")))?; + let len = mmap.len(); + if len < 10 { + return Err(DataFusionError::Execution(format!( + "feather sidecar {path} is too small to be a valid Arrow IPC file ({len} bytes)" + ))); + } + let ptr = NonNull::new(mmap.as_ptr() as *mut u8).ok_or_else(|| { + DataFusionError::Execution(format!("feather sidecar {path}: null mmap pointer")) + })?; + // SAFETY: `ptr`/`len` describe the mapping, and the `Arc` owner keeps it + // alive for as long as any slice of this Buffer (or array derived from it) lives. + let buffer = unsafe { Buffer::from_custom_allocation(ptr, len, Arc::new(mmap)) }; + + let trailer_start = len - 10; + let footer_len = read_footer_length(buffer[trailer_start..].try_into().map_err(|_| { + DataFusionError::Execution(format!("feather sidecar {path}: bad IPC trailer")) + })?) + .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; + // `read_footer_length` only validates the trailing magic, not that the footer + // fits in the file. Guard the subtraction so a corrupt length returns an error + // instead of underflowing/panicking on the slice. + if footer_len > trailer_start { + return Err(DataFusionError::Execution(format!( + "feather sidecar {path}: footer length {footer_len} exceeds file size" + ))); + } + let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]) + .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; + + let schema: SchemaRef = Arc::new(fb_to_schema(footer.schema().ok_or_else(|| { + DataFusionError::Execution(format!("feather sidecar {path}: footer has no schema")) + })?)); + + // A block's [offset, offset+meta+body) must lie within the file; otherwise + // `Buffer::slice_with_length` panics. Validate before slicing so corrupt + // offsets surface as a typed error. + let block_range = |offset: i64, meta: i32, body: i64| -> DFResult<(usize, usize)> { + let block_len = i64::from(meta).checked_add(body).filter(|&l| l >= 0); + let end = block_len.and_then(|l| offset.checked_add(l)); + match (offset >= 0, block_len, end) { + (true, Some(l), Some(e)) if (e as u64) <= len as u64 => { + Ok((offset as usize, l as usize)) + } + _ => Err(DataFusionError::Execution(format!( + "feather sidecar {path}: corrupt IPC block offset/length" + ))), + } + }; + + let mut decoder = FileDecoder::new(schema.clone(), footer.version()); + for block in footer.dictionaries().iter().flatten() { + let (off, block_len) = + block_range(block.offset(), block.metaDataLength(), block.bodyLength())?; + let data = buffer.slice_with_length(off, block_len); + decoder + .read_dictionary(block, &data) + .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))?; + } + + let mut batches = Vec::new(); + if let Some(record_batches) = footer.recordBatches() { + for block in record_batches { + let (off, block_len) = + block_range(block.offset(), block.metaDataLength(), block.bodyLength())?; + let data = buffer.slice_with_length(off, block_len); + if let Some(batch) = decoder + .read_record_batch(block, &data) + .map_err(|e| DataFusionError::Execution(format!("feather sidecar {path}: {e}")))? + { + batches.push(batch); + } + } + } + Ok((schema, batches, buffer)) +} + +// ── Key helpers (u64 ordering domain — matches `extract_keys_as_u64`) ─────────── + +/// Read the key at `row` as `u64`. Supports the same integer key types as +/// `extract_keys_as_u64` (Int64/UInt64/Int32/UInt32). +fn key_at(col: &dyn Array, row: usize) -> DFResult { + let any = col.as_any(); + if let Some(a) = any.downcast_ref::() { + return Ok(a.value(row) as u64); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.value(row)); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.value(row) as u64); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.value(row) as u64); + } + Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column type {:?} is not supported; use Int64/UInt64/Int32/UInt32", + col.data_type() + ))) +} + +/// First index `i` in `col` whose key (compared as `u64`) is `>= target`. +/// Binary search over the sorted key column — touches only the pages it reads. +fn partition_point_u64(col: &dyn Array, target: u64) -> DFResult { + let any = col.as_any(); + if let Some(a) = any.downcast_ref::() { + return Ok(a.values().partition_point(|&v| (v as u64) < target)); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.values().partition_point(|&v| v < target)); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.values().partition_point(|&v| (v as u64) < target)); + } + if let Some(a) = any.downcast_ref::() { + return Ok(a.values().partition_point(|&v| (v as u64) < target)); + } + Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column type {:?} is not supported; use Int64/UInt64/Int32/UInt32", + col.data_type() + ))) +} + +// ── Provider ────────────────────────────────────────────────────────────────── + +/// mmap-backed, sorted-by-key Arrow positional [`PointLookupProvider`]. +/// +/// Holds the payload as zero-copy [`RecordBatch`]es referencing the mmap, in +/// ascending-key order, plus a tiny first-key-per-batch coarse index. Built by +/// [`FeatherSidecarBuilder`] or opened from a `.feather` file with +/// [`open`](Self::open). +pub struct FeatherLookupProvider { + schema: SchemaRef, + /// Payload batches, ascending by key and non-overlapping across batches. + /// Arrays reference the mmap (see `_mmap`). + batches: Vec, + /// Coarse index: first key of `batches[i]`, ascending. Narrows a lookup to a + /// single batch before the in-batch binary search. + batch_first_key: Vec, + /// Keeps the memory mapping alive; the batch arrays' buffers point into it. + _mmap: Buffer, + key_col: String, + n_rows: usize, +} + +impl fmt::Debug for FeatherLookupProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "FeatherLookupProvider(key_col={}, rows={}, batches={}, schema_cols={})", + self.key_col, + self.n_rows, + self.batches.len(), + self.schema.fields().len() + ) + } +} + +impl FeatherLookupProvider { + /// Open an existing `.feather` (Arrow IPC file) sidecar via mmap. + /// + /// The on-disk schema is self-describing, so the provider's + /// [`schema`](PointLookupProvider::schema) is taken from the file. Fails if + /// the stored key column is not strictly ascending — the binary search + /// depends on it, and the builder always writes it sorted, so a violation + /// signals a corrupt or foreign file. + pub fn open(path: &str) -> DFResult { + let (schema, batches, mmap) = mmap_feather(path)?; + Self::from_parts(schema, batches, mmap) + } + + fn from_parts(schema: SchemaRef, batches: Vec, mmap: Buffer) -> DFResult { + if schema.fields().is_empty() { + return Err(DataFusionError::Execution( + "FeatherLookupProvider: schema has no columns; field 0 must be the key column" + .into(), + )); + } + let key_col = schema.field(0).name().clone(); + + // Build the coarse index and verify the global ascending+unique invariant + // the binary search relies on (within each batch and across batches). + let mut batch_first_key: Vec = Vec::with_capacity(batches.len()); + let mut n_rows = 0usize; + let mut prev_last: Option = None; + let mut kept: Vec = Vec::with_capacity(batches.len()); + for batch in batches { + let nrows = batch.num_rows(); + if nrows == 0 { + continue; + } + let keycol = batch.column(0).as_ref(); + if keycol.null_count() > 0 { + return Err(DataFusionError::Execution( + "FeatherLookupProvider: key column has a null value; row keys must be non-null" + .into(), + )); + } + let first = key_at(keycol, 0)?; + // Strictly ascending within the batch. + let mut prev = first; + for r in 1..nrows { + let k = key_at(keycol, r)?; + if k <= prev { + return Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column '{key_col}' is not strictly ascending \ + (unsorted or duplicate keys); the sidecar is corrupt or was not built by \ + FeatherSidecarBuilder" + ))); + } + prev = k; + } + // Strictly ascending across the batch boundary. + if prev_last.is_some_and(|pl| first <= pl) { + return Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: key column '{key_col}' is not strictly ascending \ + across batches; the sidecar is corrupt or was not built by \ + FeatherSidecarBuilder" + ))); + } + prev_last = Some(prev); + batch_first_key.push(first); + n_rows += nrows; + kept.push(batch); + } + + Ok(Self { + schema, + batches: kept, + batch_first_key, + _mmap: mmap, + key_col, + n_rows, + }) + } + + pub fn len(&self) -> usize { + self.n_rows + } + pub fn is_empty(&self) -> bool { + self.n_rows == 0 + } +} + +#[async_trait] +impl PointLookupProvider for FeatherLookupProvider { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + async fn fetch_by_keys( + &self, + keys: &[u64], + _key_col: &str, + projection: Option<&[usize]>, + ) -> DFResult> { + if keys.is_empty() || self.batches.is_empty() { + return Ok(vec![]); + } + + // Validate the projection up front and gracefully (like + // `RecordBatch::project`), rather than panicking on an out-of-range index. + let nfields = self.schema.fields().len(); + let proj: Vec = match projection { + None => (0..nfields).collect(), + Some(idxs) => { + if let Some(&bad) = idxs.iter().find(|&&i| i >= nfields) { + return Err(DataFusionError::Execution(format!( + "FeatherLookupProvider: projection index {bad} out of bounds for schema \ + with {nfields} columns" + ))); + } + idxs.to_vec() + } + }; + let out_schema: SchemaRef = Arc::new(Schema::new( + proj.iter() + .map(|&i| self.schema.field(i).clone()) + .collect::>(), + )); + + // Sort + dedup the requested keys: sorting yields ascending output + // (matching SQLite's `ORDER BY rowid`); dedup matches `WHERE key IN (...)`. + let mut want = keys.to_vec(); + want.sort_unstable(); + want.dedup(); + + // Resolve each present key to (batch index, local row), in ascending key + // order. The coarse index narrows to a batch; partition_point + exact + // match locate the row within it. + let mut matches: Vec<(usize, u32)> = Vec::with_capacity(want.len()); + for &k in &want { + let cb = self.batch_first_key.partition_point(|&fk| fk <= k); + if cb == 0 { + continue; // below the smallest stored key + } + let b = cb - 1; + let keycol = self.batches[b].column(0).as_ref(); + let pos = partition_point_u64(keycol, k)?; + if pos < self.batches[b].num_rows() && key_at(keycol, pos)? == k { + matches.push((b, pos as u32)); + } + } + if matches.is_empty() { + return Ok(vec![]); + } + + // `matches` is grouped by batch (ascending k → non-decreasing batch idx). + // `take` each batch's projected columns once, then concat into one batch. + let mut out_batches: Vec = Vec::new(); + let mut j = 0; + while j < matches.len() { + let b = matches[j].0; + let mut locals: Vec = Vec::new(); + while j < matches.len() && matches[j].0 == b { + locals.push(matches[j].1); + j += 1; + } + let idx = UInt32Array::from(locals); + let cols: Vec = proj + .iter() + .map(|&c| { + compute::take(self.batches[b].column(c).as_ref(), &idx, None) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + }) + .collect::>()?; + out_batches.push( + RecordBatch::try_new(out_schema.clone(), cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?, + ); + } + + let combined = compute::concat_batches(&out_schema, &out_batches) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + Ok(vec![combined]) + } +} + +// ── TableProvider ───────────────────────────────────────────────────────────── +// +// The payload is already addressable (mmap'd batches), so a full scan is a cheap +// MemTable over them. Lets DataFusion resolve column names when the provider is +// registered as a table. + +#[async_trait] +impl TableProvider for FeatherLookupProvider { + fn as_any(&self) -> &dyn Any { + self + } + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let mem = MemTable::try_new(self.schema.clone(), vec![self.batches.clone()])?; + mem.scan(state, projection, &[], None).await + } +} + +// ── Streaming sidecar builder (bounded-memory external sort) ───────────────────── + +/// Incremental builder for a [`FeatherLookupProvider`], the Feather analogue of +/// [`SqliteSidecarBuilder`](crate::sqlite_provider::SqliteSidecarBuilder). +/// +/// Takes input [`RecordBatch`]es one at a time (e.g. from the DuckLake +/// snapshot-pinned, row-lineage scan), reads each row's key from a designated +/// column, and projects the value columns into the output schema. +/// +/// **Bounded memory, order-agnostic.** Feather binary search needs the on-disk +/// key column sorted, but — unlike SQLite's order-independent B-tree — the input +/// scan's row order is not guaranteed (DuckLake `rowid` order from the scan is +/// not reliable). Rather than buffer the whole payload and sort it (O(N) +/// resident) or depend on the scan emitting sorted rowids, this builder spills: +/// [`push_batch`](Self::push_batch) writes rows UNSORTED to a temp IPC file +/// (O(one batch) payload resident), keeping only an O(N)·8 B key index in memory; +/// [`finish`](Self::finish) argsorts the keys and gathers the rows in sorted +/// order from the mmap'd temp via `interleave`, in bounded chunks, into the final +/// sorted file. Duplicate keys are rejected (mirroring SQLite's primary key). +/// +/// The first field of `schema` is the key column; fields 1.. are the stored value +/// columns. `key_col_index` / `value_col_indices` index the *input* batches. +pub struct FeatherSidecarBuilder { + final_path: String, + schema: SchemaRef, + key_col_index: usize, + value_col_indices: Vec, + /// Unsorted spill file; rows are appended in push order. Auto-removed on drop. + temp: tempfile::NamedTempFile, + writer: arrow_ipc::writer::FileWriter, + /// Key of each row, in push (spill) order — the only O(N) in-memory state. + keys: Vec, + last_key: Option, + input_sorted: bool, +} + +impl FeatherSidecarBuilder { + /// Begin a build targeting `path` (the output `.feather` file). Opens the + /// temp spill file immediately. + pub fn begin( + path: &str, + schema: SchemaRef, + key_col_index: usize, + value_col_indices: Vec, + ) -> DFResult { + if schema.fields().is_empty() { + return Err(DataFusionError::Execution( + "FeatherSidecarBuilder: schema has no columns; field 0 must be the key column" + .into(), + )); + } + if schema.fields().len() != value_col_indices.len() + 1 { + return Err(DataFusionError::Execution(format!( + "FeatherSidecarBuilder: schema has {} fields but expected 1 key column + {} value columns", + schema.fields().len(), + value_col_indices.len() + ))); + } + let temp = tempfile::NamedTempFile::new() + .map_err(|e| DataFusionError::Execution(format!("create feather spill file: {e}")))?; + let spill = temp + .reopen() + .map_err(|e| DataFusionError::Execution(format!("open feather spill file: {e}")))?; + let writer = arrow_ipc::writer::FileWriter::try_new(spill, &schema) + .map_err(|e| DataFusionError::Execution(format!("init feather spill writer: {e}")))?; + Ok(Self { + final_path: path.to_string(), + schema, + key_col_index, + value_col_indices, + temp, + writer, + keys: Vec::new(), + last_key: None, + input_sorted: true, + }) + } + + /// Project and spill every row of `batch` (unsorted), recording each row's + /// key. Peak memory is O(one batch) plus the running key index. + pub fn push_batch(&mut self, batch: &RecordBatch) -> DFResult<()> { + let ncols = batch.num_columns(); + if self.key_col_index >= ncols { + return Err(DataFusionError::Execution(format!( + "FeatherSidecarBuilder: key_col_index {} out of range for batch with {ncols} columns", + self.key_col_index + ))); + } + if let Some(&bad) = self.value_col_indices.iter().find(|&&i| i >= ncols) { + return Err(DataFusionError::Execution(format!( + "FeatherSidecarBuilder: value column index {bad} out of range for batch with {ncols} columns" + ))); + } + + let key_col = batch.column(self.key_col_index); + if key_col.null_count() > 0 { + return Err(DataFusionError::Execution( + "FeatherSidecarBuilder: key column has a null value; row keys must be non-null" + .into(), + )); + } + for k in extract_keys_as_u64(key_col.as_ref())?.into_iter().flatten() { + if self.last_key.is_some_and(|prev| k < prev) { + self.input_sorted = false; + } + self.last_key = Some(k); + self.keys.push(k); + } + + // Project input columns to output-schema order: key first, then values. + let mut cols: Vec = Vec::with_capacity(self.value_col_indices.len() + 1); + cols.push(batch.column(self.key_col_index).clone()); + for &ci in &self.value_col_indices { + cols.push(batch.column(ci).clone()); + } + let projected = RecordBatch::try_new(self.schema.clone(), cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + self.writer + .write(&projected) + .map_err(|e| DataFusionError::Execution(format!("spill feather batch: {e}")))?; + Ok(()) + } + + /// Whether every row arrived in ascending key order (i.e. the spill was + /// already sorted). Informational. + pub fn input_was_sorted(&self) -> bool { + self.input_sorted + } + + /// Sort by key and write the final sorted `.feather`, then open a provider + /// over it. Bounded memory: argsort the key index, then gather rows from the + /// mmap'd spill via `interleave` in `BUILD_CHUNK_ROWS` chunks. + pub fn finish(mut self) -> DFResult { + self.writer + .finish() + .map_err(|e| DataFusionError::Execution(format!("finalize feather spill: {e}")))?; + + let n = self.keys.len(); + // Stable argsort of the keys (u64 domain), then reject any duplicate — + // strictly-ascending is the provider's invariant. `usize` indices avoid a + // truncation ceiling on the row count. + let mut order: Vec = (0..n).collect(); + order.sort_by_key(|&i| self.keys[i]); + for w in order.windows(2) { + if self.keys[w[0]] >= self.keys[w[1]] { + return Err(DataFusionError::Execution( + "FeatherSidecarBuilder: duplicate row key; keys must be unique".into(), + )); + } + } + + // mmap the unsorted spill; map global row position → (batch, local row). + let temp_path = self + .temp + .path() + .to_str() + .ok_or_else(|| DataFusionError::Execution("feather spill path is not UTF-8".into()))? + .to_string(); + let (_tschema, tbatches, _tmmap) = mmap_feather(&temp_path)?; + let mut offsets: Vec = Vec::with_capacity(tbatches.len() + 1); + let mut acc = 0usize; + offsets.push(0); + for b in &tbatches { + acc += b.num_rows(); + offsets.push(acc); + } + + // Write to a sibling temp file and atomically rename into place, so a + // concurrent reader's live mmap of an existing sidecar keeps pointing at + // the old (now-unlinked) inode rather than observing a truncated file + // (SIGBUS / torn reads). Also gives crash-safety: a half-written build + // never leaves a corrupt sidecar at `final_path`. + let final_dir = std::path::Path::new(&self.final_path) + .parent() + .filter(|p| !p.as_os_str().is_empty()) + .map(std::path::Path::to_path_buf) + .unwrap_or_else(|| std::path::PathBuf::from(".")); + let out_tmp = tempfile::Builder::new() + .prefix(".feather-build-") + .tempfile_in(&final_dir) + .map_err(|e| DataFusionError::Execution(format!("create feather output temp: {e}")))?; + let out_handle = out_tmp + .reopen() + .map_err(|e| DataFusionError::Execution(format!("open feather output temp: {e}")))?; + let mut out = arrow_ipc::writer::FileWriter::try_new(out_handle, &self.schema) + .map_err(|e| DataFusionError::Execution(format!("init feather writer: {e}")))?; + + let ncols = self.schema.fields().len(); + let mut start = 0usize; + while start < n { + let end = (start + BUILD_CHUNK_ROWS).min(n); + // (batch, local) pairs for this chunk, in sorted-key order. + let pairs: Vec<(usize, usize)> = order[start..end] + .iter() + .map(|&gp| { + let b = offsets.partition_point(|&o| o <= gp) - 1; + (b, gp - offsets[b]) + }) + .collect(); + let cols: Vec = (0..ncols) + .map(|c| { + let arrays: Vec<&dyn Array> = + tbatches.iter().map(|b| b.column(c).as_ref()).collect(); + compute::interleave(&arrays, &pairs) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + }) + .collect::>()?; + let batch = RecordBatch::try_new(self.schema.clone(), cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + out.write(&batch) + .map_err(|e| DataFusionError::Execution(format!("write feather batch: {e}")))?; + start = end; + } + out.finish() + .map_err(|e| DataFusionError::Execution(format!("finalize feather sidecar: {e}")))?; + + // Atomic publish: rename the completed temp over the final path. + out_tmp.persist(&self.final_path).map_err(|e| { + DataFusionError::Execution(format!("publish feather sidecar {}: {e}", self.final_path)) + })?; + + tracing::info!( + "Feather sidecar '{}' built: {} rows, input_already_sorted={}.", + self.final_path, + n, + self.input_sorted, + ); + + // `self.temp` (the spill) is removed when it drops at end of scope. + FeatherLookupProvider::open(&self.final_path) + } +} diff --git a/src/lib.rs b/src/lib.rs index a5f9eab..1a98efb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,6 +65,8 @@ pub mod rule; pub mod udf; pub mod udtf; +#[cfg(feature = "feather-provider")] +pub mod feather_provider; #[cfg(feature = "parquet-provider")] pub mod parquet_provider; #[cfg(feature = "sqlite-provider")] @@ -82,6 +84,8 @@ pub use rule::USearchRule; pub use udf::{cosine_distance_udf, l2_distance_udf, negative_dot_product_udf}; pub use udtf::VectorSearchVectorUDTF; +#[cfg(feature = "feather-provider")] +pub use feather_provider::{FeatherLookupProvider, FeatherSidecarBuilder}; #[cfg(feature = "parquet-provider")] pub use parquet_provider::ParquetLookupProvider; #[cfg(feature = "sqlite-provider")] diff --git a/tests/feather_provider_test.rs b/tests/feather_provider_test.rs new file mode 100644 index 0000000..4b193ce --- /dev/null +++ b/tests/feather_provider_test.rs @@ -0,0 +1,1001 @@ +#![cfg(feature = "feather-provider")] +//! Tests for `FeatherLookupProvider` / `FeatherSidecarBuilder` (ticket #702). +//! +//! Coverage: +//! - **Parity** (`#[cfg(feature = "sqlite-provider")]`): a Rust port of the +//! benchmark's `verify_same_data.py` — Feather returns the same rows as the +//! SQLite provider for sparse/holey rowids, including edges and absent keys. +//! - **Type fidelity**: Feather round-trips Decimal / FixedSizeList / Struct / +//! Dictionary / nested-List losslessly — the types `SqliteSidecarBuilder` +//! rejects at build time. +//! - **Projection**, **build-order / sort-agnosticism**, and an informational +//! **footprint + build-time** comparison. + +use std::sync::Arc; + +use arrow_array::builder::{FixedSizeListBuilder, Float64Builder, ListBuilder}; +use arrow_array::types::Int32Type; +use arrow_array::{ + Array, ArrayRef, Decimal128Array, DictionaryArray, Int64Array, RecordBatch, StringArray, + StructArray, TimestampMicrosecondArray, UInt64Array, +}; +use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; +use datafusion::arrow::util::pretty::pretty_format_batches; +use datafusion_vector_search_ext::{ + FeatherLookupProvider, FeatherSidecarBuilder, PointLookupProvider, +}; +use tempfile::tempdir; + +// ── Synthetic sparse-rowid payload (DuckLake-shaped) ──────────────────────────── + +/// Deterministic PRNG (SplitMix64) so tests are reproducible without a `rand` dep. +struct Rng(u64); +impl Rng { + fn next(&mut self) -> u64 { + self.0 = self.0.wrapping_add(0x9E3779B97F4A7C15); + let mut z = self.0; + z = (z ^ (z >> 30)).wrapping_mul(0xBF58476D1CE4E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D049BB133111EB); + z ^ (z >> 31) + } + fn below(&mut self, n: u64) -> u64 { + self.next() % n + } +} + +/// Columns mirror the benchmark payload: rowid (key) + id/url/title/sha/raw/filename. +/// `title` is intermittently null to exercise null handling. +fn payload_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("id", DataType::Int64, false), + Field::new("url", DataType::Utf8, true), + Field::new("title", DataType::Utf8, true), + Field::new("sha", DataType::Utf8, true), + Field::new("raw", DataType::Utf8, true), + Field::new("filename", DataType::Utf8, true), + ])) +} + +/// Generate `n` rows with strictly-increasing **sparse** rowids (~60% holes: +/// gaps drawn from 1..=4), chunked into `chunk`-row batches. Returns the schema, +/// the batches (in ascending-key order), and the sorted rowid list. +fn gen_payload(n: usize, chunk: usize, seed: u64) -> (SchemaRef, Vec, Vec) { + let schema = payload_schema(); + let mut rng = Rng(seed); + + let mut rowids: Vec = Vec::with_capacity(n); + let mut cur: i64 = 5; + for _ in 0..n { + cur += 1 + rng.below(4) as i64; // gap 1..=4 → sparse, strictly increasing + rowids.push(cur); + } + + let mut batches = Vec::new(); + for start in (0..n).step_by(chunk) { + let end = (start + chunk).min(n); + let rid: Vec = rowids[start..end].to_vec(); + let id: Vec = rid.iter().map(|r| r.wrapping_mul(7)).collect(); + let url: Vec> = rid + .iter() + .map(|r| Some(format!("https://example.com/doc/{r}"))) + .collect(); + let title: Vec> = rid + .iter() + .map(|r| (r % 7 != 0).then(|| format!("Title for row {r}"))) + .collect(); + let sha: Vec> = rid.iter().map(|r| Some(format!("{r:040x}"))).collect(); + let raw: Vec> = rid + .iter() + .map(|r| Some(format!("raw payload bytes for row {r}: {}", "x".repeat(40)))) + .collect(); + let filename: Vec> = + rid.iter().map(|r| Some(format!("file_{r}.txt"))).collect(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(rid)), + Arc::new(Int64Array::from(id)), + Arc::new(StringArray::from(url)), + Arc::new(StringArray::from(title)), + Arc::new(StringArray::from(sha)), + Arc::new(StringArray::from(raw)), + Arc::new(StringArray::from(filename)), + ], + ) + .unwrap(); + batches.push(batch); + } + (schema, batches, rowids) +} + +fn build_feather( + dir: &tempfile::TempDir, + schema: SchemaRef, + batches: &[RecordBatch], +) -> FeatherLookupProvider { + let path = dir.path().join("payload.feather"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut builder = + FeatherSidecarBuilder::begin(path.to_str().unwrap(), schema, 0, value_cols).unwrap(); + for b in batches { + builder.push_batch(b).unwrap(); + } + builder.finish().unwrap() +} + +fn fmt(batches: &[RecordBatch]) -> String { + pretty_format_batches(batches).unwrap().to_string() +} + +// ── A. Provider correctness & parity ─────────────────────────────────────────── + +#[cfg(feature = "sqlite-provider")] +fn build_sqlite( + dir: &tempfile::TempDir, + schema: SchemaRef, + batches: &[RecordBatch], +) -> datafusion_vector_search_ext::SqliteLookupProvider { + use datafusion_vector_search_ext::SqliteSidecarBuilder; + let path = dir.path().join("payload.db"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut builder = + SqliteSidecarBuilder::begin(path.to_str().unwrap(), "payload", 4, schema, 0, value_cols) + .unwrap(); + for b in batches { + builder.push_batch(b).unwrap(); + } + builder.finish().unwrap() +} + +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn parity_with_sqlite_sparse_rowids() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(3000, 256, 0xC0FFEE); + + let feather = build_feather(&dir, schema.clone(), &batches); + let sqlite = build_sqlite(&dir, schema.clone(), &batches); + + let n = rowids.len(); + let mut rng = Rng(42); + let pick = |rng: &mut Rng, count: usize| -> Vec { + (0..count) + .map(|_| rowids[rng.below(n as u64) as usize] as u64) + .collect() + }; + + // Build a battery of key sets covering scattered, edges, duplicates, absent. + let min = rowids[0] as u64; + let max = *rowids.last().unwrap() as u64; + let mid = rowids[n / 2] as u64; + + let mut key_sets: Vec<(&str, Vec)> = vec![ + ("empty", vec![]), + ("smallest", vec![min]), + ("largest", vec![max]), + ("mid", vec![mid]), + ("edges_together", vec![min, mid, max]), + ("duplicates", vec![mid, mid, min, min, max]), + // Absent: below min, above max, and a value in a gap between two rowids. + ("absent_below", vec![0]), + ("absent_above", vec![max + 1000]), + ("absent_in_gap", vec![rowids[10] as u64 + 1]), + ("mixed_present_absent", vec![min, 0, mid, max + 1000, max]), + ]; + for k in [1usize, 10, 100, 1000] { + key_sets.push(("scattered", pick(&mut rng, k))); + } + + for (label, keys) in &key_sets { + let f = feather.fetch_by_keys(keys, "rowid", None).await.unwrap(); + let s = sqlite.fetch_by_keys(keys, "rowid", None).await.unwrap(); + assert_eq!( + fmt(&f), + fmt(&s), + "feather vs sqlite mismatch for key set '{label}' ({} keys)", + keys.len() + ); + } +} + +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn parity_with_sqlite_projection() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(1500, 512, 7); + let feather = build_feather(&dir, schema.clone(), &batches); + let sqlite = build_sqlite(&dir, schema.clone(), &batches); + + let keys: Vec = rowids.iter().step_by(13).map(|&r| r as u64).collect(); + + // narrow (rowid, title, filename) and full + a reordered projection. + for proj in [ + vec![0usize, 3, 6], + (0..schema.fields().len()).collect::>(), + vec![3, 0, 6, 1], + ] { + let f = feather + .fetch_by_keys(&keys, "rowid", Some(&proj)) + .await + .unwrap(); + let s = sqlite + .fetch_by_keys(&keys, "rowid", Some(&proj)) + .await + .unwrap(); + assert_eq!(fmt(&f), fmt(&s), "projection {proj:?} mismatch"); + // Only the projected columns are present, in projection order. + assert_eq!(f[0].num_columns(), proj.len()); + for (out_i, &src_i) in proj.iter().enumerate() { + assert_eq!( + f[0].schema().field(out_i).name(), + schema.field(src_i).name() + ); + } + } +} + +/// The exact-match guard: an absent key (one that falls between two stored +/// rowids, or below/above the range) must never alias onto a neighbour. After +/// hydration the returned rowid column equals exactly the present, sorted, +/// deduped requested keys. +#[tokio::test] +async fn binary_search_lands_exactly() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(2000, 256, 99); + let feather = build_feather(&dir, schema, &batches); + + let present: Vec = rowids.iter().step_by(7).map(|&r| r as u64).collect(); + let absent: Vec = rowids + .iter() + .step_by(7) + .map(|&r| r as u64 + 1) // gaps are ≥1, so +1 may or may not exist; filter below + .filter(|k| rowids.binary_search(&(*k as i64)).is_err()) + .collect(); + + let mut request = present.clone(); + request.extend(&absent); + request.push(0); + request.push(*rowids.last().unwrap() as u64 + 9999); + + let out = feather + .fetch_by_keys(&request, "rowid", None) + .await + .unwrap(); + let got: Vec = out[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + + let mut expected: Vec = present.iter().map(|&k| k as i64).collect(); + expected.sort_unstable(); + expected.dedup(); + assert_eq!(got, expected, "binary search aliased an absent key"); +} + +#[tokio::test] +async fn empty_and_all_absent_return_no_batches() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(500, 128, 3); + let feather = build_feather(&dir, schema, &batches); + + assert!( + feather + .fetch_by_keys(&[], "rowid", None) + .await + .unwrap() + .is_empty() + ); + let absent = vec![0u64, 1, *rowids.last().unwrap() as u64 + 1_000_000]; + assert!( + feather + .fetch_by_keys(&absent, "rowid", None) + .await + .unwrap() + .is_empty() + ); +} + +// ── B. Type fidelity ──────────────────────────────────────────────────────────── + +/// Build a payload with types `SqliteSidecarBuilder` rejects, and confirm Feather +/// round-trips them losslessly — same Arrow types out, same values. +#[tokio::test] +async fn type_fidelity_roundtrip() { + let dir = tempdir().unwrap(); + + let struct_fields = Fields::from(vec![ + Field::new("lat", DataType::Float64, false), + Field::new("lon", DataType::Float64, false), + ]); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("price", DataType::Decimal128(20, 4), false), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, Some("America/New_York".into())), + false, + ), + Field::new( + "embedding3", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float64, true)), 3), + false, + ), + Field::new("loc", DataType::Struct(struct_fields.clone()), false), + Field::new( + "tag", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + Field::new( + "scores", + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))), + true, + ), + ])); + + let rowid = Int64Array::from(vec![10_i64, 20, 30]); + let price = Decimal128Array::from(vec![12345_i128, -98765, 0]) + .with_precision_and_scale(20, 4) + .unwrap(); + let ts = TimestampMicrosecondArray::from(vec![1_000_000_i64, 2_000_000, 3_000_000]) + .with_timezone("America/New_York"); + + let mut fsl = FixedSizeListBuilder::new(Float64Builder::new(), 3); + for triple in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] { + for v in triple { + fsl.values().append_value(v); + } + fsl.append(true); + } + let embedding3 = fsl.finish(); + + let loc = StructArray::from(vec![ + ( + Arc::new(Field::new("lat", DataType::Float64, false)), + Arc::new(arrow_array::Float64Array::from(vec![40.0, 41.0, 42.0])) as ArrayRef, + ), + ( + Arc::new(Field::new("lon", DataType::Float64, false)), + Arc::new(arrow_array::Float64Array::from(vec![-74.0, -75.0, -76.0])) as ArrayRef, + ), + ]); + + let tag: DictionaryArray = vec![Some("nlp"), Some("vision"), Some("nlp")] + .into_iter() + .collect(); + + let mut scores = ListBuilder::new(Float64Builder::new()); + scores.values().append_value(0.1); + scores.values().append_value(0.2); + scores.append(true); + scores.values().append_value(0.3); + scores.append(true); + scores.append(false); // null list + let scores = scores.finish(); + + let input = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(rowid), + Arc::new(price), + Arc::new(ts), + Arc::new(embedding3), + Arc::new(loc), + Arc::new(tag), + Arc::new(scores), + ], + ) + .unwrap(); + + let feather = build_feather(&dir, schema.clone(), std::slice::from_ref(&input)); + let out = feather + .fetch_by_keys(&[10, 20, 30], "rowid", None) + .await + .unwrap(); + + // Types preserved exactly (not coerced to i64/JSON/TEXT). + for i in 0..schema.fields().len() { + assert_eq!( + out[0].schema().field(i).data_type(), + schema.field(i).data_type(), + "type of column '{}' was not preserved", + schema.field(i).name() + ); + } + // Values preserved exactly (input is already key-sorted, so rows align). + assert_eq!(fmt(&out), fmt(std::slice::from_ref(&input))); +} + +/// The contrast: `SqliteSidecarBuilder::begin` rejects each of these payload +/// types at build time (validated against its supported set), whereas Feather +/// accepts them (proven above). +#[cfg(feature = "sqlite-provider")] +#[test] +fn sqlite_rejects_types_feather_accepts() { + use datafusion_vector_search_ext::SqliteSidecarBuilder; + let dir = tempdir().unwrap(); + + let exotic: Vec<(&str, DataType)> = vec![ + ("decimal", DataType::Decimal128(20, 4)), + ( + "fixed_size_list", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float64, true)), 3), + ), + ( + "struct", + DataType::Struct(Fields::from(vec![Field::new( + "a", + DataType::Float64, + false, + )])), + ), + ( + "dictionary", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + ), + ( + "list_of_float", + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))), + ), + ]; + + for (name, dt) in exotic { + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new(name, dt, true), + ])); + let db = dir.path().join(format!("{name}.db")); + let res = + SqliteSidecarBuilder::begin(db.to_str().unwrap(), "t", 1, schema.clone(), 0, vec![1]); + assert!( + res.is_err(), + "expected SQLite to reject payload type for column '{name}'" + ); + + // Feather accepts the same schema at begin. + let feather_path = dir.path().join(format!("{name}.feather")); + assert!( + FeatherSidecarBuilder::begin(feather_path.to_str().unwrap(), schema, 0, vec![1]) + .is_ok(), + "expected Feather to accept payload type for column '{name}'" + ); + } +} + +// ── D/E. Build path: sort-agnosticism & ordering ──────────────────────────────── + +/// The builder must produce a sorted-by-rowid file regardless of input order, +/// and report whether the input was already sorted. +#[tokio::test] +async fn builder_is_sort_agnostic() { + let dir = tempdir().unwrap(); + let (schema, sorted_batches, rowids) = gen_payload(1000, 100, 5); + + // Sorted input → reported sorted; output sorted. + let path_sorted = dir.path().join("sorted.feather"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut b = FeatherSidecarBuilder::begin( + path_sorted.to_str().unwrap(), + schema.clone(), + 0, + value_cols.clone(), + ) + .unwrap(); + for batch in &sorted_batches { + b.push_batch(batch).unwrap(); + } + assert!(b.input_was_sorted(), "ascending input should report sorted"); + let p_sorted = b.finish().unwrap(); + + // Shuffled input (reversed batch order + reversed rows) → reported unsorted, + // but output is still correctly sorted and identical to the sorted build. + let path_shuf = dir.path().join("shuffled.feather"); + let mut b2 = + FeatherSidecarBuilder::begin(path_shuf.to_str().unwrap(), schema.clone(), 0, value_cols) + .unwrap(); + for batch in sorted_batches.iter().rev() { + b2.push_batch(&reverse_rows(batch)).unwrap(); + } + assert!( + !b2.input_was_sorted(), + "shuffled input should report unsorted" + ); + let p_shuf = b2.finish().unwrap(); + + let all: Vec = rowids.iter().map(|&r| r as u64).collect(); + let from_sorted = p_sorted.fetch_by_keys(&all, "rowid", None).await.unwrap(); + let from_shuf = p_shuf.fetch_by_keys(&all, "rowid", None).await.unwrap(); + assert_eq!( + fmt(&from_sorted), + fmt(&from_shuf), + "sort-agnostic build produced different output for shuffled input" + ); + + // And the keys really are ascending. + let keys: Vec = from_shuf[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + assert!( + keys.windows(2).all(|w| w[0] < w[1]), + "output not sorted ascending" + ); +} + +/// Reverse the row order of a batch (helper for the shuffle test). +fn reverse_rows(batch: &RecordBatch) -> RecordBatch { + let n = batch.num_rows(); + let idx = arrow_array::UInt32Array::from((0..n).rev().map(|i| i as u32).collect::>()); + let cols: Vec = batch + .columns() + .iter() + .map(|c| datafusion::arrow::compute::take(c.as_ref(), &idx, None).unwrap()) + .collect(); + RecordBatch::try_new(batch.schema(), cols).unwrap() +} + +#[tokio::test] +async fn reopen_from_file_roundtrips() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(800, 200, 11); + let path = dir.path().join("reopen.feather"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut b = + FeatherSidecarBuilder::begin(path.to_str().unwrap(), schema, 0, value_cols).unwrap(); + for batch in &batches { + b.push_batch(batch).unwrap(); + } + let from_builder = b.finish().unwrap(); + + // Open a fresh provider straight off the published file (the replica path). + let reopened = FeatherLookupProvider::open(path.to_str().unwrap()).unwrap(); + assert_eq!(reopened.len(), rowids.len()); + + let keys: Vec = rowids.iter().step_by(5).map(|&r| r as u64).collect(); + let a = from_builder + .fetch_by_keys(&keys, "rowid", None) + .await + .unwrap(); + let c = reopened.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + assert_eq!(fmt(&a), fmt(&c)); +} + +// ── E/F. Footprint & build-time (informational) ───────────────────────────────── + +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn footprint_and_build_time_vs_sqlite() { + use std::time::Instant; + let dir = tempdir().unwrap(); + let (schema, batches, _rowids) = gen_payload(50_000, 2048, 1234); + + let t0 = Instant::now(); + let _f = build_feather(&dir, schema.clone(), &batches); + let feather_build = t0.elapsed(); + + let t1 = Instant::now(); + let _s = build_sqlite(&dir, schema.clone(), &batches); + let sqlite_build = t1.elapsed(); + + let feather_bytes = std::fs::metadata(dir.path().join("payload.feather")) + .unwrap() + .len(); + let sqlite_bytes = std::fs::metadata(dir.path().join("payload.db")) + .unwrap() + .len(); + + println!( + "[#702 footprint] rows=50000 feather={feather_bytes} bytes ({:.1} MiB) \ + sqlite={sqlite_bytes} bytes ({:.1} MiB) ratio(feather/sqlite)={:.2}", + feather_bytes as f64 / 1048576.0, + sqlite_bytes as f64 / 1048576.0, + feather_bytes as f64 / sqlite_bytes as f64, + ); + println!( + "[#702 build-time] feather={feather_build:?} sqlite={sqlite_build:?} \ + speedup(sqlite/feather)={:.1}x", + sqlite_build.as_secs_f64() / feather_build.as_secs_f64().max(1e-9), + ); + + // Sanity only (not a perf gate): both files exist and are non-trivial. + assert!(feather_bytes > 0 && sqlite_bytes > 0); +} + +// ── open() error paths & robustness ───────────────────────────────────────────── + +/// Write a batch to a `.feather` file directly (bypassing the builder's sort), so +/// we can construct deliberately-malformed sidecars for the rejection tests. +fn write_raw_feather(path: &str, batch: &RecordBatch) { + let file = std::fs::File::create(path).unwrap(); + let mut w = arrow_ipc::writer::FileWriter::try_new(file, &batch.schema()).unwrap(); + w.write(batch).unwrap(); + w.finish().unwrap(); +} + +/// `open` must fail loudly (not silently return wrong rows) when the stored key +/// column is not strictly ascending — both the unsorted and duplicate-key cases. +/// This guards the "fail at open, not at query time" contract the binary search +/// depends on. +#[test] +fn open_rejects_unsorted_or_duplicate_keys() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + + let unsorted = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![10_i64, 3, 1])), + Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])), + ], + ) + .unwrap(); + let p1 = dir.path().join("unsorted.feather"); + write_raw_feather(p1.to_str().unwrap(), &unsorted); + assert!( + FeatherLookupProvider::open(p1.to_str().unwrap()).is_err(), + "descending key column must be rejected" + ); + + // Duplicate keys must also be rejected (mirrors SQLite's INTEGER PRIMARY KEY). + let dup = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(vec![1_i64, 1, 2])), + Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])), + ], + ) + .unwrap(); + let p2 = dir.path().join("dup.feather"); + write_raw_feather(p2.to_str().unwrap(), &dup); + assert!( + FeatherLookupProvider::open(p2.to_str().unwrap()).is_err(), + "duplicate keys must be rejected" + ); +} + +/// `open` returns an error (never panics) on a non-Arrow file and on a missing +/// file, rather than surfacing a corrupt provider. +#[test] +fn open_rejects_malformed_or_missing_file() { + let dir = tempdir().unwrap(); + let garbage = dir.path().join("garbage.feather"); + std::fs::write(&garbage, b"this is not an arrow ipc file").unwrap(); + assert!(FeatherLookupProvider::open(garbage.to_str().unwrap()).is_err()); + + let missing = dir.path().join("does_not_exist.feather"); + assert!(FeatherLookupProvider::open(missing.to_str().unwrap()).is_err()); +} + +/// Regression for the build-sort domain fix: a genuinely-ascending Int64 key +/// column containing negative values must BUILD (not be rejected as "corrupt") +/// and round-trip via the u64 lookup domain the engine uses for rowids. Before +/// the fix, `finish()` sorted by signed Arrow order while the verify/search used +/// u64 order, so this input was spuriously rejected. +#[tokio::test] +async fn signed_negative_keys_build_and_query() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![-5_i64, -1, 3, 10])), + Arc::new(StringArray::from(vec![ + Some("neg5"), + Some("neg1"), + Some("three"), + Some("ten"), + ])), + ], + ) + .unwrap(); + let path = dir.path().join("neg.feather"); + let mut b = FeatherSidecarBuilder::begin(path.to_str().unwrap(), schema, 0, vec![1]).unwrap(); + b.push_batch(&batch).unwrap(); + let provider = b + .finish() + .expect("negative-but-ascending Int64 keys must build"); + + // The engine passes rowids as u64; a negative i64 arrives as its u64 reinterpretation. + let out = provider + .fetch_by_keys(&[(-5_i64) as u64, 3], "rowid", None) + .await + .unwrap(); + let names: Vec = out + .iter() + .flat_map(|bch| { + bch.column(1) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .map(|s| s.to_string()) + }) + .collect(); + assert_eq!(names.len(), 2); + assert!(names.contains(&"neg5".to_string()) && names.contains(&"three".to_string())); +} + +/// UInt64 key column (the other common rowid type) round-trips. Only Int64 was +/// covered before; this exercises the UInt64 arm of `extract_keys_as_u64`. +#[tokio::test] +async fn uint64_keys_roundtrip() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::UInt64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt64Array::from(vec![5_u64, 50, 500, 5000])), + Arc::new(StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + ])), + ], + ) + .unwrap(); + let path = dir.path().join("u64.feather"); + let mut b = FeatherSidecarBuilder::begin(path.to_str().unwrap(), schema, 0, vec![1]).unwrap(); + b.push_batch(&batch).unwrap(); + let provider = b.finish().unwrap(); + + let out = provider + .fetch_by_keys(&[50, 5000], "rowid", None) + .await + .unwrap(); + let names: Vec = out[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .map(|s| s.to_string()) + .collect(); + assert_eq!(names, vec!["b".to_string(), "d".to_string()]); +} + +/// The `TableProvider::scan` path (used for SQL column-name resolution) actually +/// returns rows — a working MemTable scan, unlike the parquet sibling which is +/// NotImplemented. Guards against the silent zero-row scan that bit parquet. +#[tokio::test] +async fn table_provider_scan_roundtrips() { + use datafusion::catalog::TableProvider; + use datafusion::prelude::SessionContext; + + let dir = tempdir().unwrap(); + let (schema, batches, _rowids) = gen_payload(300, 128, 21); + let provider = build_feather(&dir, schema, &batches); + let n = provider.len(); + + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(provider) as Arc) + .unwrap(); + + let all = ctx + .sql("SELECT rowid, title FROM t") + .await + .unwrap() + .collect() + .await + .unwrap(); + let total: usize = all.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, n, "full scan should see every row"); + assert_eq!( + all[0].num_columns(), + 2, + "projected scan returns only 2 columns" + ); +} + +// ── Multi-batch (spill-sort + coarse-index) ───────────────────────────────────── + +/// Build > BUILD_CHUNK_ROWS rows from fully-unsorted input so the spill-sort runs +/// and the final file is multi-batch, then check fetch parity vs SQLite — with +/// keys straddling the internal batch boundary to exercise the coarse index + +/// cross-batch gather. +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn parity_multibatch_shuffled_build() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(20_000, 1024, 0xABCD); + + // Feather: push batches reversed + each batch's rows reversed → spill is fully + // unsorted, forcing the external sort to do real work. + let fpath = dir.path().join("mb.feather"); + let value_cols: Vec = (1..schema.fields().len()).collect(); + let mut fb = + FeatherSidecarBuilder::begin(fpath.to_str().unwrap(), schema.clone(), 0, value_cols) + .unwrap(); + for b in batches.iter().rev() { + fb.push_batch(&reverse_rows(b)).unwrap(); + } + assert!( + !fb.input_was_sorted(), + "reversed input should report unsorted" + ); + let feather = fb.finish().unwrap(); + assert_eq!(feather.len(), rowids.len()); + + // SQLite oracle (order-independent B-tree). + let sqlite = build_sqlite(&dir, schema.clone(), &batches); + + let n = rowids.len(); + let mut rng = Rng(7); + let mut keys: Vec = (0..1500) + .map(|_| rowids[rng.below(n as u64) as usize] as u64) + .collect(); + // Straddle the first internal batch boundary (sorted chunk size 8192). + for &i in &[0usize, 8191, 8192, 8193, n - 1] { + keys.push(rowids[i] as u64); + } + + let f = feather.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + let s = sqlite.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + assert_eq!( + fmt(&f), + fmt(&s), + "multi-batch shuffled-build parity mismatch" + ); + + // Projected fetch across the boundary too. + let proj = vec![0usize, 3, 6]; + let f = feather + .fetch_by_keys(&keys, "rowid", Some(&proj)) + .await + .unwrap(); + let s = sqlite + .fetch_by_keys(&keys, "rowid", Some(&proj)) + .await + .unwrap(); + assert_eq!(fmt(&f), fmt(&s), "multi-batch projected parity mismatch"); +} + +// ── Regression + edge cases for the mmap + spill-sort rewrite ──────────────────── + +/// `open` must return an error (not panic) on a file whose IPC trailer claims a +/// footer larger than the file — the corrupt-footer guard. +#[test] +fn open_rejects_corrupt_footer_length() { + let dir = tempdir().unwrap(); + let p = dir.path().join("badfooter.feather"); + // 32 bytes: padding, then a valid 10-byte trailer [footer_len i32 LE][b"ARROW1"] + // with an oversized footer_len. + let mut bytes = vec![0u8; 22]; + bytes.extend_from_slice(&1_000_000_i32.to_le_bytes()); + bytes.extend_from_slice(b"ARROW1"); + std::fs::write(&p, &bytes).unwrap(); + assert!( + FeatherLookupProvider::open(p.to_str().unwrap()).is_err(), + "oversized footer_len must error, not panic" + ); +} + +/// A build with zero pushed rows yields a valid, empty, re-openable sidecar. +#[tokio::test] +async fn empty_build_is_valid() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let p = dir.path().join("empty.feather"); + let builder = FeatherSidecarBuilder::begin(p.to_str().unwrap(), schema, 0, vec![1]).unwrap(); + let provider = builder.finish().unwrap(); + assert!(provider.is_empty()); + assert!( + provider + .fetch_by_keys(&[1, 2, 3], "rowid", None) + .await + .unwrap() + .is_empty() + ); + let reopened = FeatherLookupProvider::open(p.to_str().unwrap()).unwrap(); + assert_eq!(reopened.len(), 0); +} + +/// Single-row build (smallest non-empty case). +#[tokio::test] +async fn single_row_build() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![42_i64])), + Arc::new(StringArray::from(vec![Some("answer")])), + ], + ) + .unwrap(); + let p = dir.path().join("one.feather"); + let mut b = FeatherSidecarBuilder::begin(p.to_str().unwrap(), schema, 0, vec![1]).unwrap(); + b.push_batch(&batch).unwrap(); + let provider = b.finish().unwrap(); + assert_eq!(provider.len(), 1); + let out = provider.fetch_by_keys(&[42], "rowid", None).await.unwrap(); + assert_eq!(out[0].num_rows(), 1); + assert!( + provider + .fetch_by_keys(&[7], "rowid", None) + .await + .unwrap() + .is_empty() + ); +} + +/// Build exactly 2× BUILD_CHUNK_ROWS (16384) rows so the gather hits an exact +/// chunk boundary, and check parity vs SQLite for keys straddling it. +#[cfg(feature = "sqlite-provider")] +#[tokio::test] +async fn build_at_exact_chunk_boundary() { + let dir = tempdir().unwrap(); + let (schema, batches, rowids) = gen_payload(16_384, 4096, 0xBEEF); + let feather = build_feather(&dir, schema.clone(), &batches); + let sqlite = build_sqlite(&dir, schema.clone(), &batches); + assert_eq!(feather.len(), 16_384); + + let mut keys: Vec = [0usize, 8191, 8192, 8193, 16_383] + .iter() + .map(|&i| rowids[i] as u64) + .collect(); + let mut rng = Rng(1); + for _ in 0..500 { + keys.push(rowids[rng.below(16_384) as usize] as u64); + } + let f = feather.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + let s = sqlite.fetch_by_keys(&keys, "rowid", None).await.unwrap(); + assert_eq!(fmt(&f), fmt(&s), "chunk-boundary parity mismatch"); +} + +/// Duplicate keys split across separate push_batch calls must be rejected at +/// finish (the dup check runs over the global sorted order, not per-batch). +#[test] +fn duplicate_keys_across_pushes_rejected() { + let dir = tempdir().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let mk = |rid: Vec, names: Vec<&'static str>| { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(rid)), + Arc::new(StringArray::from( + names.into_iter().map(Some).collect::>(), + )), + ], + ) + .unwrap() + }; + let p = dir.path().join("dup.feather"); + let mut b = + FeatherSidecarBuilder::begin(p.to_str().unwrap(), schema.clone(), 0, vec![1]).unwrap(); + b.push_batch(&mk(vec![1, 2], vec!["a", "b"])).unwrap(); + b.push_batch(&mk(vec![2, 3], vec!["c", "d"])).unwrap(); // key 2 duplicated across pushes + assert!( + b.finish().is_err(), + "duplicate key across pushes must be rejected" + ); +}