diff --git a/Cargo.lock b/Cargo.lock index ec89bf1161f..e66d4c48bc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3146,8 +3146,7 @@ checksum = "f8eb564c5c7423d25c886fb561d1e4ee69f72354d16918afa32c08811f6b6a55" [[package]] name = "fastlanes" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414cb755aee48ff7b0907995d2949c68c8c17900970076dff6a808e18e592d71" +source = "git+https://github.com/spiraldb/fastlanes?rev=267717cd72e8b6f0ed0e5321ae3fc785fa433058#267717cd72e8b6f0ed0e5321ae3fc785fa433058" dependencies = [ "arrayref", "const_for", diff --git a/Cargo.toml b/Cargo.toml index 9700d8d78ed..72d24a43ad3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -412,3 +412,9 @@ debug = false debug-assertions = false strip = "debuginfo" incremental = false + +# Temporary patch: build against the fastlanes revision carrying the `delta_for_bitpacking` +# fused `Delta::unfor_undelta_pack` kernel (spiraldb/fastlanes#140, rev 267717c). Replace with a +# published fastlanes version bump once that PR merges and releases. +[patch.crates-io] +fastlanes = { git = "https://github.com/spiraldb/fastlanes", rev = "267717cd72e8b6f0ed0e5321ae3fc785fa433058" } diff --git a/encodings/fastlanes/Cargo.toml b/encodings/fastlanes/Cargo.toml index 08c96c481d7..3f467461f11 100644 --- a/encodings/fastlanes/Cargo.toml +++ b/encodings/fastlanes/Cargo.toml @@ -36,10 +36,13 @@ rand = { workspace = true } rstest = { workspace = true } vortex-alp = { path = "../alp" } vortex-array = { workspace = true, features = ["_test-harness"] } -vortex-fastlanes = { path = ".", features = ["_test-harness"] } +vortex-fastlanes = { path = ".", features = ["_test-harness", "unstable_encodings"] } [features] _test-harness = ["dep:rand"] +# Unstable encodings/decoders with no stability guarantee. Enables the fused +# delta(for(bitpacking)) decode kernel from the fastlanes crate. +unstable_encodings = ["fastlanes/delta_for_bitpacking"] [[bench]] name = "bitpacking_take" @@ -64,6 +67,11 @@ required-features = ["_test-harness"] name = "bitpack_compare" harness = false +[[bench]] +name = "delta_for_bitpack" +harness = false +required-features = ["unstable_encodings", "_test-harness"] + [[bench]] name = "cast_bitpacked" harness = false diff --git a/encodings/fastlanes/benches/delta_for_bitpack.rs b/encodings/fastlanes/benches/delta_for_bitpack.rs new file mode 100644 index 00000000000..950ba840d7e --- /dev/null +++ b/encodings/fastlanes/benches/delta_for_bitpack.rs @@ -0,0 +1,117 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! A/B decode of a `delta(for(bitpacking))` column, both arms going through the real Vortex decode +//! entry points on the *same* array: +//! * `fused` — `delta_decompress` with the fused `unfor_undelta_pack` fast path. +//! * `current` — `delta_decompress_generic`, the path Vortex took before the fused kernel: +//! materialize the FoR(bitpacked) deltas child, then un-delta + untranspose. +//! +//! The column is non-strictly-increasing (monotone non-decreasing) so it compresses as +//! delta(for(bitpacking)). +//! +//! Run with `cargo bench -p vortex-fastlanes --bench delta_for_bitpack +//! --features unstable_encodings,_test-harness`. + +#![expect(clippy::unwrap_used)] +#![expect(clippy::cast_possible_truncation)] + +use divan::Bencher; +use divan::counter::ItemsCount; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::primitive::PrimitiveArrayExt; +use vortex_array::match_each_unsigned_integer_ptype; +use vortex_fastlanes::Delta; +use vortex_fastlanes::DeltaArray; +use vortex_fastlanes::FoR; +use vortex_fastlanes::FoRArrayExt; +use vortex_fastlanes::bitpack_compress::bitpack_encode; +use vortex_fastlanes::delta_compress; +use vortex_fastlanes::delta_decompress; +use vortex_fastlanes::delta_decompress_generic; + +fn main() { + divan::main(); +} + +// Exact multiples of 1024 so the deltas bit-pack without a zero-padding wrap. +const LENS: &[usize] = &[64 * 1024, 1024 * 1024]; + +/// Build the `delta(for(bitpacking))` stack for `values`. +fn build(values: PrimitiveArray) -> (DeltaArray, usize, ExecutionCtx) { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let len = values.len(); + + let (bases, deltas) = delta_compress(&values, &mut ctx).unwrap(); + let for_deltas = FoR::encode(deltas).unwrap(); + let reference = for_deltas.reference_scalar().clone(); + let for_encoded = for_deltas + .encoded() + .clone() + .execute::(&mut ctx) + .unwrap(); + + // Smallest width that captures every value, so bit-packing introduces no patches. + let unsigned = for_encoded.ptype().to_unsigned(); + let bit_width = match_each_unsigned_integer_ptype!(unsigned, |T| { + let reinterpreted = for_encoded.reinterpret_cast(unsigned); + let max = reinterpreted + .as_slice::() + .iter() + .copied() + .max() + .unwrap_or_default(); + (T::BITS - max.leading_zeros()) as u8 + }); + let bitpacked = bitpack_encode(&for_encoded, bit_width, None, &mut ctx).unwrap(); + + let for_child = FoR::try_new(bitpacked.into_array(), reference) + .unwrap() + .into_array(); + let array = Delta::try_new(bases.into_array(), for_child, 0, len).unwrap(); + (array, len, ctx) +} + +fn u32_non_decreasing(len: usize) -> PrimitiveArray { + PrimitiveArray::from_iter((0..len as u32).map(|i| i / 4)) +} + +fn u64_non_decreasing(len: usize) -> PrimitiveArray { + PrimitiveArray::from_iter((0..len as u64).map(|i| (i / 6) * 3)) +} + +#[divan::bench(args = LENS)] +fn fused_u32(bencher: Bencher, len: usize) { + let (array, n, mut ctx) = build(u32_non_decreasing(len)); + bencher + .counter(ItemsCount::new(n)) + .bench_local(|| delta_decompress(&array, &mut ctx).unwrap()); +} + +#[divan::bench(args = LENS)] +fn current_u32(bencher: Bencher, len: usize) { + let (array, n, mut ctx) = build(u32_non_decreasing(len)); + bencher + .counter(ItemsCount::new(n)) + .bench_local(|| delta_decompress_generic(&array, &mut ctx).unwrap()); +} + +#[divan::bench(args = LENS)] +fn fused_u64(bencher: Bencher, len: usize) { + let (array, n, mut ctx) = build(u64_non_decreasing(len)); + bencher + .counter(ItemsCount::new(n)) + .bench_local(|| delta_decompress(&array, &mut ctx).unwrap()); +} + +#[divan::bench(args = LENS)] +fn current_u64(bencher: Bencher, len: usize) { + let (array, n, mut ctx) = build(u64_non_decreasing(len)); + bencher + .counter(ItemsCount::new(n)) + .bench_local(|| delta_decompress_generic(&array, &mut ctx).unwrap()); +} diff --git a/encodings/fastlanes/src/delta/array/delta_compress.rs b/encodings/fastlanes/src/delta/array/delta_compress.rs index e35778ad29e..1db5b88ca5a 100644 --- a/encodings/fastlanes/src/delta/array/delta_compress.rs +++ b/encodings/fastlanes/src/delta/array/delta_compress.rs @@ -105,20 +105,88 @@ mod tests { use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::assert_arrays_eq; + #[cfg(feature = "unstable_encodings")] + use vortex_array::match_each_unsigned_integer_ptype; use vortex_array::session::ArraySession; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_session::VortexSession; use crate::Delta; + #[cfg(feature = "unstable_encodings")] + use crate::FoR; use crate::bitpack_compress::bitpack_encode; use crate::delta::array::delta_decompress::delta_decompress; use crate::delta_compress; + #[cfg(feature = "unstable_encodings")] + use crate::r#for::FoRArrayExt; static SESSION: LazyLock = LazyLock::new(|| VortexSession::empty().with::()); + /// Build a `delta(for(bitpacking))` stack from `array`: delta-encode, then FoR + bit-pack the + /// resulting deltas. This is the exact tree the fused decode path in `delta_decompress` + /// recognizes. + #[cfg(feature = "unstable_encodings")] + fn build_delta_for_bitpacked( + array: &PrimitiveArray, + ctx: &mut vortex_array::ExecutionCtx, + ) -> VortexResult { + let (bases, deltas) = delta_compress(array, ctx)?; + let for_deltas = FoR::encode(deltas)?; + let reference = for_deltas.reference_scalar().clone(); + let for_encoded = for_deltas + .encoded() + .clone() + .execute::(ctx)?; + // Pick the smallest width that captures every value so bit-packing introduces no patches, + // keeping the array on the fused decode path. + let unsigned = for_encoded.ptype().to_unsigned(); + let bit_width = match_each_unsigned_integer_ptype!(unsigned, |T| { + let reinterpreted = for_encoded.reinterpret_cast(unsigned); + let max = reinterpreted + .as_slice::() + .iter() + .copied() + .max() + .unwrap_or_default(); + (T::BITS - max.leading_zeros()) as u8 + }); + let bitpacked = bitpack_encode(&for_encoded, bit_width, None, ctx)?; + let fused_for = FoR::try_new(bitpacked.into_array(), reference)?; + Delta::try_new(bases.into_array(), fused_for.into_array(), 0, array.len()) + } + + /// Non-strictly-increasing (monotone non-decreasing) integer columns. Consecutive equal runs + /// make many deltas zero, so the per-lane FoR reference over the deltas is small and the deltas + /// bit-pack tightly — exactly the shape that produces a delta(for(bitpacking)) stack. + /// + /// Lengths are exact multiples of 1024 so there is no zero-padding tail. (Padding can make a + /// lane straddle the real/zero boundary, producing a wrapping delta that forces full width.) + #[cfg(feature = "unstable_encodings")] + #[rstest] + #[case::u32_non_decreasing((0u32..20_480).map(|i| i / 3).collect())] + #[case::u64_non_decreasing((0u64..20_480).map(|i| (i / 5) * 2).collect())] + #[case::u32_long_runs((0u32..20_480).map(|i| i / 100).collect())] + fn fused_for_bitpacking_roundtrip(#[case] array: PrimitiveArray) -> VortexResult<()> { + use crate::delta::array::delta_decompress::try_fused_for_bitpacking; + + let mut ctx = SESSION.create_execution_ctx(); + let stack = build_delta_for_bitpacked(&array, &mut ctx)?; + + // The stack must take the fused decode path, not silently fall back to the generic one. + assert!( + try_fused_for_bitpacking(&stack, &mut ctx)?.is_some(), + "delta(for(bitpacking)) must be recognized by the fused decode path" + ); + + let decompressed = stack.into_array().execute::(&mut ctx)?; + assert_arrays_eq!(decompressed, array); + Ok(()) + } + #[rstest] #[case::u32((0u32..10_000).collect())] #[case::u8((0..10_000).map(|i| (i % (u8::MAX as i32)) as u8).collect())] diff --git a/encodings/fastlanes/src/delta/array/delta_decompress.rs b/encodings/fastlanes/src/delta/array/delta_decompress.rs index fe2567e63c7..07d7514a3ab 100644 --- a/encodings/fastlanes/src/delta/array/delta_decompress.rs +++ b/encodings/fastlanes/src/delta/array/delta_decompress.rs @@ -15,15 +15,42 @@ use vortex_array::dtype::NativePType; use vortex_array::match_each_unsigned_integer_ptype; use vortex_buffer::Buffer; use vortex_buffer::BufferMut; +#[cfg(feature = "unstable_encodings")] +use vortex_error::VortexExpect; use vortex_error::VortexResult; +#[cfg(feature = "unstable_encodings")] +use crate::BitPacked; +#[cfg(feature = "unstable_encodings")] +use crate::BitPackedArrayExt; use crate::DeltaArray; +#[cfg(feature = "unstable_encodings")] +use crate::FoR; use crate::bit_transpose::untranspose_validity; use crate::delta::array::DeltaArrayExt; +#[cfg(feature = "unstable_encodings")] +use crate::r#for::FoRArrayExt; pub fn delta_decompress( array: &DeltaArray, ctx: &mut ExecutionCtx, +) -> VortexResult { + // Fast path: a fully fused `delta(for(bitpacking))` decode that unpacks, applies the + // frame-of-reference, and inverts the delta encoding in a single pass over the packed buffer. + #[cfg(feature = "unstable_encodings")] + if let Some(decoded) = try_fused_for_bitpacking(array, ctx)? { + return Ok(decoded); + } + + delta_decompress_generic(array, ctx) +} + +/// The generic delta decode: fully materialize the `deltas` child, then invert the delta encoding +/// (un-delta + untranspose). This is the path taken for every stack that the fused fast path does +/// not recognize, and the one Vortex used before the fused `delta(for(bitpacking))` kernel existed. +pub fn delta_decompress_generic( + array: &DeltaArray, + ctx: &mut ExecutionCtx, ) -> VortexResult { let bases = array.bases().clone().execute::(ctx)?; let deltas = array.deltas().clone().execute::(ctx)?; @@ -52,6 +79,126 @@ pub fn delta_decompress( Ok(decoded.reinterpret_cast(original_ptype)) } +/// Attempts the fused `delta(for(bitpacking))` decode. +/// +/// Returns `Some` when the `deltas` child is a [`FoR`] array with an unsigned reference wrapping a +/// [`BitPacked`] array stored as full, zero-offset chunks with no patches. In that case the packed +/// deltas are unpacked, FoR-decoded, and un-delta'd in a single pass via +/// [`Delta::unchecked_unfor_undelta_pack`]. Otherwise returns `None` so the caller falls back to the +/// generic path. +#[cfg(feature = "unstable_encodings")] +pub(crate) fn try_fused_for_bitpacking( + array: &DeltaArray, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let Some(for_) = array.deltas().as_opt::() else { + return Ok(None); + }; + // The fused kernel works in unsigned wrapping arithmetic; a signed reference would need a + // bit-reinterpret that the generic path already handles correctly. + if !for_.reference_scalar().dtype().is_unsigned_int() { + return Ok(None); + } + let Some(bp) = for_.encoded().as_opt::() else { + return Ok(None); + }; + // Patches and sliced (non-zero offset) bit-packed children are left to the generic path. + if bp.patches().is_some() || bp.offset() != 0 { + return Ok(None); + } + + let bases = array.bases().clone().execute::(ctx)?; + + let start = array.offset(); + let end = start + array.len(); + + let validity = untranspose_validity(&bp.validity()?, ctx)?; + let validity = validity.slice(start..end)?; + + let original_ptype = for_.ptype(); + let unsigned_ptype = original_ptype.to_unsigned(); + let bases = bases.reinterpret_cast(unsigned_ptype); + + let decoded = match_each_unsigned_integer_ptype!(unsigned_ptype, |T| { + const LANES: usize = T::LANES; + + let reference = for_ + .reference_scalar() + .as_primitive() + .as_::() + .vortex_expect("FoR reference must be non-null and unsigned"); + let packed = bp.packed_slice::(); + + let buffer = decompress_fused::( + bases.as_slice(), + packed, + bp.bit_width() as usize, + reference, + bp.len(), + ); + let buffer = buffer.slice(start..end); + + PrimitiveArray::new(buffer, validity) + }); + + Ok(Some(decoded.reinterpret_cast(original_ptype))) +} + +/// Fused low-level decode of bit-packed, FoR-encoded deltas. +/// +/// `packed` holds `num_values / 1024` chunks each of `128 * bit_width / size_of::()` packed +/// words. Each chunk is unpacked, FoR-decoded (wrapping-add `reference`) and un-delta'd in a single +/// pass, then untransposed back into logical order. +#[cfg(feature = "unstable_encodings")] +pub(crate) fn decompress_fused( + bases: &[T], + packed: &[T], + bit_width: usize, + reference: T, + num_values: usize, +) -> Buffer +where + T: NativePType + Delta + Transpose, +{ + debug_assert!( + num_values.is_multiple_of(1024), + "bit-packed deltas must be padded to a multiple of 1024" + ); + let num_chunks = num_values / 1024; + let elems_per_chunk = 128 * bit_width / size_of::(); + debug_assert_eq!(packed.len(), num_chunks * elems_per_chunk); + assert!(bases.len() >= num_chunks * LANES); + + let mut output = BufferMut::with_capacity(num_values); + let (output_chunks, _) = output.spare_capacity_mut().as_chunks_mut::<1024>(); + + let mut transposed: [T; 1024] = [T::default(); 1024]; + for (i, output_chunk) in output_chunks.iter_mut().enumerate() { + let packed_chunk = &packed[i * elems_per_chunk..(i + 1) * elems_per_chunk]; + let base = &bases[i * LANES..(i + 1) * LANES]; + + // SAFETY: `packed_chunk` has length `128 * bit_width / size_of::()`, `base` has length + // `LANES`, and `transposed` has length 1024, satisfying the kernel's contract. + unsafe { + Delta::unchecked_unfor_undelta_pack( + bit_width, + packed_chunk, + reference, + base, + &mut transposed, + ); + } + + Transpose::untranspose(&transposed, unsafe { + mem::transmute::<&mut [MaybeUninit; 1024], &mut [T; 1024]>(output_chunk) + }); + } + + unsafe { output.set_len(num_values) }; + + output.freeze() +} + /// Performs the low-level delta decompression on primitive values. /// /// All chunks must be full 1024-element chunks (deltas length must be a multiple of 1024). diff --git a/encodings/fastlanes/src/delta/mod.rs b/encodings/fastlanes/src/delta/mod.rs index 52ea9b33574..e5774f7f654 100644 --- a/encodings/fastlanes/src/delta/mod.rs +++ b/encodings/fastlanes/src/delta/mod.rs @@ -4,6 +4,10 @@ mod array; pub use array::DeltaData; pub use array::delta_compress::delta_compress; +// Exposed for benchmarks: decode entry points so a bench can A/B the fused fast path against the +// generic (pre-fusion) decode on the same array. +#[cfg(feature = "_test-harness")] +pub use array::delta_decompress::{delta_decompress, delta_decompress_generic}; mod compute; diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 1adb6508828..8dd7a6c92e0 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -53,6 +53,7 @@ unstable_encodings = [ "dep:vortex-tensor", "dep:vortex-onpair", "vortex-zstd?/unstable_encodings", + "vortex-fastlanes/unstable_encodings", ] pco = ["dep:pco", "dep:vortex-pco"] zstd = ["dep:vortex-zstd"]