From aa3f3d5932396063d26cd916f9caa907e5ba7ca2 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Thu, 16 Apr 2026 17:57:22 -0400 Subject: [PATCH] vector search scan benchmark Signed-off-by: Connor Tsui --- .../vector-search-bench/src/compression.rs | 2 - benchmarks/vector-search-bench/src/display.rs | 133 +++++++++++++ benchmarks/vector-search-bench/src/ingest.rs | 96 +++++---- benchmarks/vector-search-bench/src/lib.rs | 3 + benchmarks/vector-search-bench/src/main.rs | 188 ++++++++++++++++++ benchmarks/vector-search-bench/src/prepare.rs | 48 ++--- benchmarks/vector-search-bench/src/query.rs | 120 +++++++++++ benchmarks/vector-search-bench/src/scan.rs | 177 +++++++++++++++++ 8 files changed, 690 insertions(+), 77 deletions(-) create mode 100644 benchmarks/vector-search-bench/src/display.rs create mode 100644 benchmarks/vector-search-bench/src/main.rs create mode 100644 benchmarks/vector-search-bench/src/query.rs create mode 100644 benchmarks/vector-search-bench/src/scan.rs diff --git a/benchmarks/vector-search-bench/src/compression.rs b/benchmarks/vector-search-bench/src/compression.rs index 65bf080db85..9b40cb15544 100644 --- a/benchmarks/vector-search-bench/src/compression.rs +++ b/benchmarks/vector-search-bench/src/compression.rs @@ -8,8 +8,6 @@ //! //! The benchmark writes one `.vortex` file per flavor per data file, then scans them all with the //! same query so the comparison is apples-to-apples with the Parquet files. -//! -//! Note that the handrolled `&[f32]` parquet baseline is **not** a flavor here. use clap::ValueEnum; use vortex::array::ArrayId; diff --git a/benchmarks/vector-search-bench/src/display.rs b/benchmarks/vector-search-bench/src/display.rs new file mode 100644 index 00000000000..a74eec46b01 --- /dev/null +++ b/benchmarks/vector-search-bench/src/display.rs @@ -0,0 +1,133 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Local table renderer for the vector-search benchmark. +//! +//! Groups columns by **flavor** (`vortex-uncompressed`, `vortex-turboquant`) rather than by +//! [`vortex_bench::Format`], because the two Vortex flavors share a single +//! `Format::OnDiskVortex`/`Format::VortexLossy` pair and the generic +//! [`vortex_bench::display::render_table`] groups by Format. Local renderer keeps the +//! column-per-flavor invariant intact without introducing a new global Format value. +//! +//! Output rows: +//! +//! ```text +//! Metric | vortex-uncompressed | vortex-turboquant +//! ------------------ + ------------------- + ----------------- +//! scan wall (mean) | 485 ms | 212 ms +//! scan wall (median) | 490 ms | 215 ms +//! matches | 42 | 39 +//! rows scanned | 10,000,000 | 10,000,000 +//! bytes scanned | 30.5 GB | 7.62 GB +//! rows / sec | 5.2e6 | 1.2e7 +//! ``` + +use std::io::Write; + +use anyhow::Result; +use tabled::settings::Style; + +use crate::compression::VectorFlavor; +use crate::prepare::CompressedVortexDataset; +use crate::scan::ScanTiming; + +/// Final column-per-flavor row set for one dataset. +pub struct DatasetReport<'a> { + pub dataset_name: &'a str, + pub vortex_results: &'a [(VectorFlavor, &'a CompressedVortexDataset, &'a ScanTiming)], +} + +/// Render the full report into the given writer as a tabled table. +pub fn render(report: &DatasetReport<'_>, writer: &mut dyn Write) -> Result<()> { + let mut headers: Vec = vec!["metric".to_owned()]; + for &(flavor, ..) in report.vortex_results { + headers.push(flavor.label().to_owned()); + } + + let rows: Vec> = vec![ + make_row("scan wall (mean)", report, |_, _, scan| { + format_duration(scan.mean) + }), + make_row("scan wall (median)", report, |_, _, scan| { + format_duration(scan.median) + }), + make_row("matches", report, |_, _, scan| scan.matches.to_string()), + make_row("rows scanned", report, |_, _, scan| { + scan.rows_scanned.to_string() + }), + make_row("bytes scanned", report, |_, _, scan| { + format_bytes(scan.bytes_scanned) + }), + make_row("rows / sec", report, |_, _, scan| { + format_throughput_rows(scan.rows_scanned, scan.mean) + }), + ]; + + writeln!(writer, "## {}", report.dataset_name)?; + let mut builder = tabled::builder::Builder::new(); + builder.push_record(headers); + for row in rows { + builder.push_record(row); + } + let mut table = builder.build(); + table.with(Style::modern()); + writeln!(writer, "{table}")?; + Ok(()) +} + +fn make_row(metric: &str, report: &DatasetReport<'_>, vortex_cell: F) -> Vec +where + F: Fn(VectorFlavor, &CompressedVortexDataset, &ScanTiming) -> String, +{ + let mut row = vec![metric.to_owned()]; + for &(flavor, prep, scan) in report.vortex_results { + row.push(vortex_cell(flavor, prep, scan)); + } + row +} + +fn format_duration(d: std::time::Duration) -> String { + let secs = d.as_secs_f64(); + if secs >= 1.0 { + format!("{secs:.2} s") + } else if secs >= 1e-3 { + format!("{:.1} ms", secs * 1e3) + } else { + format!("{:.1} µs", secs * 1e6) + } +} + +fn format_bytes(bytes: u64) -> String { + const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"]; + let mut value = bytes as f64; + let mut unit = UNITS[0]; + for next in &UNITS[1..] { + if value < 1024.0 { + break; + } + value /= 1024.0; + unit = next; + } + if unit == "B" { + format!("{bytes} B") + } else { + format!("{value:.2} {unit}") + } +} + +fn format_throughput_rows(rows: u64, wall: std::time::Duration) -> String { + let secs = wall.as_secs_f64(); + if secs <= 0.0 { + return "—".to_owned(); + } + let rps = rows as f64 / secs; + if rps >= 1e9 { + format!("{:.2}G", rps / 1e9) + } else if rps >= 1e6 { + format!("{:.2}M", rps / 1e6) + } else if rps >= 1e3 { + format!("{:.2}K", rps / 1e3) + } else { + format!("{rps:.0}") + } +} diff --git a/benchmarks/vector-search-bench/src/ingest.rs b/benchmarks/vector-search-bench/src/ingest.rs index 48904530170..f3fe04f4422 100644 --- a/benchmarks/vector-search-bench/src/ingest.rs +++ b/benchmarks/vector-search-bench/src/ingest.rs @@ -8,7 +8,9 @@ //! 1. Project the `emb` column out of each struct chunk. //! 2. Rewrap the `emb` column as `Extension>` via //! [`vortex_bench::vector_dataset::list_to_vector_ext`]. -//! 3. Cast the FSL element buffer from `f64` -> `f32` if the source is `f64`. After this point all +//! 3. Detect the FSL element ptype at runtime and cast `f64` -> `f32` when needed. Detection is +//! from the arrow schema rather than a catalog declaration so upstream parquets whose actual +//! precision disagrees with the catalog still ingest correctly. After this point all //! downstream code (compression, scan, recall) is f32-only. //! 4. Optionally project the `scalar_labels` column through unchanged so future filtered-search //! benchmarks have it without re-ingest. @@ -39,48 +41,50 @@ use vortex_bench::vector_dataset::list_to_vector_ext; use vortex_tensor::vector::AnyVector; use vortex_tensor::vector::Vector; -/// Configuration passed alongside each chunk so the transform can stay stateless. -#[derive(Debug, Clone, Copy)] -pub struct ChunkTransform { - /// Source element ptype as declared by the dataset catalog. Used purely to decide whether the - /// f64 -> f32 cast is needed. - pub src_ptype: PType, - // /// Whether to project the `scalar_labels` column through the output struct. - // pub include_scalar_labels: bool, -} +/// Apply the transform to a single struct chunk and return the rebuilt chunk. +/// +/// `chunk` must be a non-chunked `Struct { id: i64, emb: List }`, where all of the list +/// elements are +/// +/// The returned array is always a `Struct { id: i64, emb: Vector }`. +pub fn transform_chunk(chunk: ArrayRef, ctx: &mut ExecutionCtx) -> Result { + let struct_view = chunk + .as_opt::() + .with_context(|| format!("ingest: expected struct chunk, got dtype {}", chunk.dtype()))?; + + let id = struct_view + .unmasked_field_by_name("id") + .context("ingest: chunk missing `id` column")? + .clone(); + let emb = struct_view + .unmasked_field_by_name("emb") + .context("ingest: chunk missing `emb` column")? + .clone(); + + let emb_ext: ExtensionArray = list_to_vector_ext(emb)?.execute(ctx)?; + + // Detect the actual FSL element ptype from the extension storage dtype. The dataset catalog + // cannot be trusted here: at least one upstream parquet (`sift-medium-5m`) ships f64 + // embeddings despite the catalog advertising f32. + let element_ptype = { + let storage_dtype = emb_ext.storage_array().dtype(); + match storage_dtype { + DType::FixedSizeList(elem, ..) => match elem.as_ref() { + DType::Primitive(ptype, _) => *ptype, + other => bail!("ingest: expected primitive FSL element dtype, got {other}"), + }, + other => bail!("ingest: expected FSL storage dtype, got {other}"), + } + }; -impl ChunkTransform { - /// Apply the transform to a single struct chunk and return the rebuilt chunk. - /// - /// `chunk` must be a non-chunked `Struct { id: i64, emb: List }`, where all of the list - /// elements are - /// - /// The returned array is always a `Struct { id: i64, emb: Vector }`. - pub fn apply(&self, chunk: ArrayRef, ctx: &mut ExecutionCtx) -> Result { - let struct_view = chunk.as_opt::().with_context(|| { - format!("ingest: expected struct chunk, got dtype {}", chunk.dtype()) - })?; - - let id = struct_view - .unmasked_field_by_name("id") - .context("ingest: chunk missing `id` column")? - .clone(); - let emb = struct_view - .unmasked_field_by_name("emb") - .context("ingest: chunk missing `emb` column")? - .clone(); - - let emb_ext: ExtensionArray = list_to_vector_ext(emb)?.execute(ctx)?; - - let f32_vector_array = if self.src_ptype == PType::F64 { - convert_f64_to_f32_vectors(&emb_ext, ctx)? - } else { - emb_ext.into_array() - }; + let f32_vector_array = match element_ptype { + PType::F32 => emb_ext.into_array(), + PType::F64 => convert_f64_to_f32_vectors(&emb_ext, ctx)?, + other => bail!("ingest: unsupported emb element ptype {other}, expected f32 or f64"), + }; - let fields = [("id", id), ("emb", f32_vector_array)]; - Ok(StructArray::from_fields(&fields)?.into_array()) - } + let fields = [("id", id), ("emb", f32_vector_array)]; + Ok(StructArray::from_fields(&fields)?.into_array()) } /// Convert a `Vector` extension array down to `Vector`. @@ -164,10 +168,7 @@ mod tests { let emb = list_chunk_f64(&[&[1.0, 2.0, 3.0], &[4.0, 5.0, 6.0]]); let chunk = StructArray::from_fields(&[("id", id_array(&[0, 1])), ("emb", emb)])?.into_array(); - let transform = ChunkTransform { - src_ptype: PType::F64, - }; - let out = transform.apply(chunk, &mut ctx)?; + let out = transform_chunk(chunk, &mut ctx)?; let out_struct = out.as_opt::().expect("returns Struct"); let out_emb = out_struct.unmasked_field_by_name("emb").unwrap().clone(); let DType::Extension(ext) = out_emb.dtype() else { @@ -207,10 +208,7 @@ mod tests { let chunk = StructArray::from_fields(&[("id", id_array(&[0, 1])), ("emb", emb)])?.into_array(); - let transform = ChunkTransform { - src_ptype: PType::F32, - }; - let out = transform.apply(chunk, &mut ctx)?; + let out = transform_chunk(chunk, &mut ctx)?; let out_struct = out.as_opt::().expect("returns Struct"); assert_eq!(out_struct.len(), 2); Ok(()) diff --git a/benchmarks/vector-search-bench/src/lib.rs b/benchmarks/vector-search-bench/src/lib.rs index ea41e773f47..643cbb5bd0d 100644 --- a/benchmarks/vector-search-bench/src/lib.rs +++ b/benchmarks/vector-search-bench/src/lib.rs @@ -4,9 +4,12 @@ //! `vector-search-bench` vector similarity-search benchmark over several datasets. pub mod compression; +pub mod display; pub mod expression; pub mod ingest; pub mod prepare; +pub mod query; +pub mod scan; use std::sync::LazyLock; diff --git a/benchmarks/vector-search-bench/src/main.rs b/benchmarks/vector-search-bench/src/main.rs new file mode 100644 index 00000000000..626e3bfce50 --- /dev/null +++ b/benchmarks/vector-search-bench/src/main.rs @@ -0,0 +1,188 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! `vector-search-bench` — on-disk cosine-similarity scan benchmark. +//! +//! ```sh +//! cargo run -p vector-search-bench --release -- \ +//! --dataset cohere-large-10m \ +//! --layout partitioned \ +//! --flavors vortex-uncompressed,vortex-turboquant \ +//! --iterations 3 \ +//! --threshold 0.8 +//! ``` + +use std::path::PathBuf; + +use anyhow::Context; +use anyhow::Result; +use clap::Parser; +use vector_search_bench::compression::ALL_VECTOR_FLAVORS; +use vector_search_bench::compression::VectorFlavor; +use vector_search_bench::display::DatasetReport; +use vector_search_bench::display::render; +use vector_search_bench::prepare::CompressedVortexDataset; +use vector_search_bench::prepare::prepare_all; +use vector_search_bench::query::get_random_query_vector; +use vector_search_bench::scan::ScanConfig; +use vector_search_bench::scan::ScanTiming; +use vector_search_bench::scan::run_search_scan; +use vortex_bench::setup_logging_and_tracing; +use vortex_bench::vector_dataset; +use vortex_bench::vector_dataset::TrainLayout; +use vortex_bench::vector_dataset::VectorDataset; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Dataset to benchmark. Single dataset per CLI invocation by design — large datasets + /// are intentionally babysat one at a time. + #[arg(long, value_enum)] + dataset: VectorDataset, + + /// Train-split layout. Required when the dataset publishes more than one layout. + /// Defaults to the catalog's first hosted layout when omitted. + #[arg(long, value_enum)] + layout: Option, + + /// Comma-separated list of flavors to run. Each Vortex flavor produces one `.vortex` file per + /// train shard. + #[arg( + long, + value_delimiter = ',', + value_enum, + default_values_t = ALL_VECTOR_FLAVORS.to_vec(), + )] + flavors: Vec, + + /// Number of timed scan iterations per flavor. Mean and median are reported. + #[arg(long, default_value_t = 5)] + iterations: usize, + + /// Cosine threshold passed to the filter expression. + #[arg(long, default_value_t = 0.85)] + threshold: f32, + + /// Seed for the test-parquet query sampler. + #[arg(long, default_value_t = 42)] + query_seed: u64, + + /// Optional path to write the rendered table to instead of stdout. + #[arg(long)] + output_path: Option, + + /// Emit verbose tracing. + #[arg(short, long)] + verbose: bool, + + /// Enable perfetto tracing output. + #[arg(long)] + tracing: bool, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + setup_logging_and_tracing(args.verbose, args.tracing)?; + + let dataset = args.dataset; + let layout = resolve_layout(dataset, args.layout)?; + tracing::info!( + "running {} on layout {} ({} dims, {} rows)", + dataset.name(), + layout, + dataset.dim(), + dataset.num_rows() + ); + + if args.flavors.is_empty() { + anyhow::bail!("no flavors selected, please pass at least one to --flavors"); + } + + // Load the source embeddings parquet files. + let datasets_paths = vector_dataset::download(dataset, layout) + .await + .with_context(|| format!("download {}", dataset.name()))?; + + // Load all vortex files needed, compressing new ones if needed. + let prepared = prepare_all(dataset, layout, &datasets_paths, &args.flavors).await?; + + let query_vector = get_random_query_vector( + &datasets_paths.test, + dataset.dim(), + dataset.element_ptype(), + args.query_seed, + ) + .await?; + tracing::info!( + "sampled query id {} (dim={})", + query_vector.id, + query_vector.query.len() + ); + + let scan_config = ScanConfig { + iterations: args.iterations, + threshold: args.threshold, + }; + + // Run all scans and record how long each takes. + let mut scan_timings: Vec = Vec::with_capacity(prepared.len()); + for prep in &prepared { + let timing = run_search_scan(prep, &query_vector.query, &scan_config).await?; + scan_timings.push(timing); + } + + // Collect the benchmark results. + let pairs: Vec<(VectorFlavor, &CompressedVortexDataset, &ScanTiming)> = prepared + .iter() + .zip(scan_timings.iter()) + .map(|(prep, scan)| (prep.flavor, prep, scan)) + .collect(); + let report = DatasetReport { + dataset_name: dataset.name(), + vortex_results: &pairs, + }; + + // Print the results. + if let Some(path) = args.output_path { + let mut file = + std::fs::File::create(&path).with_context(|| format!("create {}", path.display()))?; + render(&report, &mut file)?; + } else { + let stdout = std::io::stdout(); + let mut handle = stdout.lock(); + render(&report, &mut handle)?; + } + + Ok(()) +} + +/// Every benchmark has different sets of possible dataset layouts available. The user **must** +/// provide one if there are multiple layouts. But if a dataset only has 1 layout, we can choose +/// that for them as the default. +fn resolve_layout(dataset: VectorDataset, requested: Option) -> Result { + let layouts = dataset.layouts(); + + match requested { + Some(layout) => { + dataset.validate_layout(layout)?; + Ok(layout) + } + None => { + if layouts.len() == 1 { + Ok(layouts[0].layout()) + } else { + let allowed = layouts + .iter() + .map(|s| s.layout().label()) + .collect::>() + .join(", "); + anyhow::bail!( + "dataset {} hosts multiple layouts ([{}]): pass --layout to pick one", + dataset.name(), + allowed, + ); + } + } + } +} diff --git a/benchmarks/vector-search-bench/src/prepare.rs b/benchmarks/vector-search-bench/src/prepare.rs index 3674b353eac..78b8716e4a1 100644 --- a/benchmarks/vector-search-bench/src/prepare.rs +++ b/benchmarks/vector-search-bench/src/prepare.rs @@ -4,9 +4,9 @@ //! Per-flavor on-disk ingest. //! //! For each `(dataset, layout, flavor)` triple, [`prepare_flavor`] streams every parquet shard -//! through the [`crate::ingest::ChunkTransform`] and writes one `.vortex` file per shard. The -//! pipeline is idempotent (existing `.vortex` files are skipped) and reports end-to-end wall-clock -//! time, summed input parquet bytes, and total output bytes. +//! and writes one `.vortex` file per shard. The pipeline is idempotent (existing `.vortex` files +//! are skipped) and reports end-to-end wall-clock time, summed input parquet bytes, and total +//! output bytes. use std::path::Path; use std::path::PathBuf; @@ -19,6 +19,8 @@ use tokio::fs::File; use tokio::io::AsyncWriteExt; use tracing::info; use tracing::warn; +use vortex::array::ArrayRef; +use vortex::array::ExecutionCtx; use vortex::array::VortexSessionExecute; use vortex::array::stream::ArrayStreamAdapter; use vortex::array::stream::ArrayStreamExt; @@ -33,25 +35,25 @@ use vortex_bench::vector_dataset::VectorDataset; use crate::SESSION; use crate::compression::VectorFlavor; -use crate::ingest::ChunkTransform; +use crate::ingest::transform_chunk; /// The paths of the vortex files that result from preparing one `(dataset, layout, flavor)` triple. #[derive(Debug, Clone)] -pub struct CompressedVortexDataSet { +pub struct CompressedVortexDataset { pub dataset: VectorDataset, pub layout: TrainLayout, pub flavor: VectorFlavor, pub vortex_files: Vec, } -/// Drive [`prepare_flavor`] across a list of flavors, returning a [`CompressedVortexDataSet`] per flavor -/// in input order. +/// Drive [`prepare_flavor`] across a list of flavors, returning a [`CompressedVortexDataset`] per +/// flavor in input order. pub async fn prepare_all( dataset: VectorDataset, layout: TrainLayout, paths_for_dataset: &DatasetPaths, flavors: &[VectorFlavor], -) -> Result> { +) -> Result> { let mut results = Vec::with_capacity(flavors.len()); for &flavor in flavors { @@ -62,7 +64,6 @@ pub async fn prepare_all( Ok(results) } -// TODO(connor): This should probably download things in parallel? /// Prepare one flavor of one dataset by writing one `.vortex` file per train shard. /// /// This function is sequential (for now). @@ -71,11 +72,7 @@ pub async fn prepare_flavor( layout: TrainLayout, paths_for_dataset: &DatasetPaths, flavor: VectorFlavor, -) -> Result { - let transform = ChunkTransform { - src_ptype: dataset.element_ptype(), - }; - +) -> Result { let mut vortex_files = Vec::with_capacity(paths_for_dataset.train_files.len()); for parquet_path in &paths_for_dataset.train_files { @@ -99,14 +96,14 @@ pub async fn prepare_flavor( } let written_path = idempotent_async(vortex_path.as_path(), |tmp| async move { - write_shard_streaming(&parquet_path, &tmp, flavor, transform).await + write_shard_streaming(&parquet_path, &tmp, flavor).await }) .await?; vortex_files.push(written_path); } - Ok(CompressedVortexDataSet { + Ok(CompressedVortexDataset { dataset, layout, flavor, @@ -122,7 +119,6 @@ async fn write_shard_streaming( parquet_path: &Path, vortex_path: &Path, flavor: VectorFlavor, - transform: ChunkTransform, ) -> Result<()> { let file = File::open(parquet_path).await?; let builder = ParquetRecordBatchStreamBuilder::new(file).await?; @@ -132,7 +128,7 @@ async fn write_shard_streaming( // We need to get the first chunk so that we know what the dtype of the file is. let first = match array_stream.next().await { - Some(chunk) => transform_chunk(transform, chunk, &mut ctx, parquet_path, 1)?, + Some(chunk) => transform_chunk_with_error(chunk, &mut ctx, parquet_path, 1)?, None => { return Err(vortex_err!( "ingest: parquet shard {} produced no chunks", @@ -148,8 +144,7 @@ async fn write_shard_streaming( futures::stream::iter(std::iter::once(Ok(first))).chain(array_stream.enumerate().map( move |(chunk_offset, chunk_or_err)| { let mut local_ctx = SESSION.create_execution_ctx(); - transform_chunk( - transform, + transform_chunk_with_error( chunk_or_err, &mut local_ctx, &shard_path, @@ -167,6 +162,8 @@ async fn write_shard_streaming( .open(vortex_path) .await?; + // This will write in parallel, using `std::thread::available_parallelism()`. + // See `CompressingStrategy` for more details. flavor .create_write_options(&SESSION) .write(&mut output, stream) @@ -176,13 +173,12 @@ async fn write_shard_streaming( Ok(()) } -fn transform_chunk( - transform: ChunkTransform, - chunk_or_err: VortexResult, - ctx: &mut vortex::array::ExecutionCtx, +fn transform_chunk_with_error( + chunk_or_err: VortexResult, + ctx: &mut ExecutionCtx, parquet_path: &Path, chunk_idx: usize, -) -> VortexResult { +) -> VortexResult { let chunk = chunk_or_err.map_err(|err| { vortex_err!( "ingest: failed to read chunk {} from {}: {err:#}", @@ -191,7 +187,7 @@ fn transform_chunk( ) })?; - transform.apply(chunk, ctx).map_err(|err| { + transform_chunk(chunk, ctx).map_err(|err| { vortex_err!( "ingest: failed to transform chunk {} from {}: {err:#}", chunk_idx, diff --git a/benchmarks/vector-search-bench/src/query.rs b/benchmarks/vector-search-bench/src/query.rs new file mode 100644 index 00000000000..cbd9d3bf38e --- /dev/null +++ b/benchmarks/vector-search-bench/src/query.rs @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Sample one query vector from `test.parquet`. +//! +//! The vector datasets ship a `test.parquet` alongside the train split: these are the query vectors +//! meant to be issued against the index. +//! +//! The benchmark picks a single random row (seeded for reproducibility) and uses it as the query +//! for the scan. + +use std::path::Path; + +use anyhow::Context; +use anyhow::Result; +use anyhow::bail; +use anyhow::ensure; +use rand::RngExt; +use rand::SeedableRng; +use rand::rngs::StdRng; +use vortex::array::IntoArray; +use vortex::array::VortexSessionExecute; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::struct_::StructArrayExt; +use vortex::dtype::PType; +use vortex::error::VortexExpect; +use vortex::error::vortex_err; +use vortex_bench::conversions::parquet_to_vortex_chunks; + +use crate::SESSION; + +/// One query vector sampled from `test.parquet`. +#[derive(Debug, Clone)] +pub struct QuerySample { + /// The ID of the vector. + pub id: i64, + /// f32 query values, length `dim`. + pub query: Vec, +} + +/// Sample one query row from `test.parquet`. +/// +/// The cast to f32 happens here when the source is f64 (matching the prepare-side cast), so that +/// all downstream code is uniformly f32. +pub async fn get_random_query_vector( + test_parquet: &Path, + expected_dim: u32, + src_ptype: PType, + seed: u64, +) -> Result { + let mut ctx = SESSION.create_execution_ctx(); + + let chunked = parquet_to_vortex_chunks(test_parquet.to_path_buf()) + .await + .with_context(|| format!("read test parquet {}", test_parquet.display()))?; + // The `test.parquet` files are generally small enough that this is not a big deal. + let struct_array: StructArray = chunked.into_array().execute(&mut ctx)?; + + let id = struct_array + .unmasked_field_by_name("id") + .context("test parquet missing `id` column")? + .clone(); + let emb = struct_array + .unmasked_field_by_name("emb") + .context("test parquet missing `emb` column")? + .clone(); + + let mut rng = StdRng::seed_from_u64(seed); + let query_row_idx = rng.random_range(0..id.len()); + + let id_scalar = id.execute_scalar(query_row_idx, &mut ctx)?; + let emb_scalar = emb.execute_scalar(query_row_idx, &mut ctx)?; + + ensure!(emb_scalar.as_list().len() == expected_dim as usize); + + let id = id_scalar + .as_primitive() + .as_::() + .ok_or_else(|| vortex_err!("vector ID was not a i64"))?; + + let query_vector = match src_ptype { + PType::F32 => emb_scalar + .as_list() + .elements() + .vortex_expect("somehow had a null test vector") + .iter() + .map(|element| { + element + .as_primitive() + .as_::() + .vortex_expect("value was not a f32") + }) + .collect(), + PType::F64 => + { + #[expect( + clippy::cast_possible_truncation, + reason = "this is intentionally lossy" + )] + emb_scalar + .as_list() + .elements() + .vortex_expect("somehow had a null test vector") + .iter() + .map(|element| { + element + .as_primitive() + .as_::() + .vortex_expect("value was not a f64") as f32 + }) + .collect() + } + ptype => bail!("source ptype {ptype} was somehow not f32 or f64"), + }; + + Ok(QuerySample { + query: query_vector, + id, + }) +} diff --git a/benchmarks/vector-search-bench/src/scan.rs b/benchmarks/vector-search-bench/src/scan.rs new file mode 100644 index 00000000000..81a054f74b3 --- /dev/null +++ b/benchmarks/vector-search-bench/src/scan.rs @@ -0,0 +1,177 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Per-iteration scan driver. +//! +//! Each iteration re-opens every `.vortex` shard fresh (so the segment cache is re-primed +//! per run), pushes the cosine-similarity filter through the scan, and drains the resulting +//! [`vortex::array::stream::ArrayStream`]. The wall-clock around the entire per-iteration +//! pass is the headline number; we track the mean and median across iterations. + +use std::path::Path; +use std::path::PathBuf; +use std::time::Duration; +use std::time::Instant; + +use anyhow::Context; +use anyhow::Result; +use futures::TryStreamExt; +use vortex::array::ArrayRef; +use vortex::file::OpenOptionsSessionExt; + +use crate::SESSION; +use crate::compression::VectorFlavor; +use crate::expression::similarity_filter; +use crate::prepare::CompressedVortexDataset; + +/// Inputs to a scan run. +#[derive(Debug, Clone)] +pub struct ScanConfig { + /// Number of timed iterations (best-of-N). + pub iterations: usize, + /// Cosine threshold passed to the filter expression. + pub threshold: f32, +} + +/// Aggregate timing + counters for one `(flavor)` scan. +#[derive(Debug, Clone)] +pub struct ScanTiming { + /// Which compression flavor's `.vortex` files were scanned. + pub flavor: VectorFlavor, + /// Arithmetic mean of the per-iteration wall times. + pub mean: Duration, + /// Median of the per-iteration wall times. + pub median: Duration, + /// Per-iteration wall times in run order. + pub all_runs: Vec, + /// Number of rows that survived the filter (constant across iterations because the + /// filter is deterministic). + pub matches: u64, + /// Total rows scanned (sum of file row counts) as a sanity check that the iteration + /// actually walked the files. + pub rows_scanned: u64, + /// Total on-disk size of the scanned `.vortex` files, in bytes. + pub bytes_scanned: u64, +} + +/// Scan every shard in a [`CompressedVortexDataset`] under the given config. +pub async fn run_search_scan( + dataset: &CompressedVortexDataset, + query: &[f32], + config: &ScanConfig, +) -> Result { + anyhow::ensure!( + config.iterations > 0, + "scan iterations must be >= 1, got {}", + config.iterations + ); + + let bytes_scanned = total_file_size(&dataset.vortex_files)?; + + let mut all_runs = Vec::with_capacity(config.iterations); + let mut matches = 0u64; + let mut rows_scanned = 0u64; + + for iter_idx in 0..config.iterations { + let (wall, iter_matches, iter_rows) = + run_one_iteration(&dataset.vortex_files, query, config.threshold).await?; + tracing::debug!( + "{} iter {} -> {:?} ({} matches, {} rows)", + dataset.flavor.label(), + iter_idx, + wall, + iter_matches, + iter_rows, + ); + // Matches and row counts are deterministic across iterations; reset rather than + // accumulate so the reported value matches a single pass. + matches = iter_matches; + rows_scanned = iter_rows; + all_runs.push(wall); + } + + Ok(ScanTiming { + flavor: dataset.flavor, + mean: mean(&all_runs), + median: median(&all_runs), + all_runs, + matches, + rows_scanned, + bytes_scanned, + }) +} + +/// Sum the on-disk sizes of the given files. +fn total_file_size(paths: &[PathBuf]) -> Result { + let mut total = 0u64; + for path in paths { + let meta = + std::fs::metadata(path).with_context(|| format!("stat {} for size", path.display()))?; + total = total.saturating_add(meta.len()); + } + Ok(total) +} + +async fn run_one_iteration( + vortex_files: &[PathBuf], + query: &[f32], + threshold: f32, +) -> Result<(Duration, u64, u64)> { + let mut matches = 0u64; + let mut rows_scanned = 0u64; + + let started = Instant::now(); + for path in vortex_files { + let (m, r) = scan_one_file(path, query, threshold).await?; + matches = matches.saturating_add(m); + rows_scanned = rows_scanned.saturating_add(r); + } + + Ok((started.elapsed(), matches, rows_scanned)) +} + +async fn scan_one_file(path: &Path, query: &[f32], threshold: f32) -> Result<(u64, u64)> { + let file = SESSION + .open_options() + .open_path(path) + .await + .with_context(|| format!("open {}", path.display()))?; + + let total_rows = file.row_count(); + let filter = similarity_filter(query, threshold)?; + let chunks: Vec = file + .scan()? + .with_filter(filter) + .into_array_stream()? + .try_collect() + .await?; + + let matches: u64 = chunks.iter().map(|c| c.len() as u64).sum(); + Ok((matches, total_rows)) +} + +/// Arithmetic mean of a list of [`Duration`]s. Empty lists return [`Duration::ZERO`]. +pub fn mean(runs: &[Duration]) -> Duration { + if runs.is_empty() { + return Duration::ZERO; + } + let total_nanos: u128 = runs.iter().map(|d| d.as_nanos()).sum(); + let avg_nanos = total_nanos / runs.len() as u128; + Duration::from_nanos(u64::try_from(avg_nanos).unwrap_or(u64::MAX)) +} + +/// Median of a list of [`Duration`]s. Empty lists return [`Duration::ZERO`]. +pub fn median(runs: &[Duration]) -> Duration { + if runs.is_empty() { + return Duration::ZERO; + } + let mut sorted = runs.to_vec(); + sorted.sort(); + let mid = sorted.len() / 2; + if sorted.len() % 2 == 1 { + sorted[mid] + } else { + let total_nanos = sorted[mid - 1].as_nanos() + sorted[mid].as_nanos(); + Duration::from_nanos(u64::try_from(total_nanos / 2).unwrap_or(u64::MAX)) + } +}