diff --git a/Cargo.lock b/Cargo.lock index 9189591e620..6833328367d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1645,6 +1645,12 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_detect" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f8f80099a98041a3d1622845c271458a2d73e688351bf3cb999266764b81d48" + [[package]] name = "cpubits" version = "0.1.1" @@ -3145,12 +3151,13 @@ checksum = "f8eb564c5c7423d25c886fb561d1e4ee69f72354d16918afa32c08811f6b6a55" [[package]] name = "fastlanes" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414cb755aee48ff7b0907995d2949c68c8c17900970076dff6a808e18e592d71" +checksum = "20c597e23b8ec8506f589d18bc701ca83a3def6086748f628ad23092e1dfe577" dependencies = [ "arrayref", "const_for", + "core_detect", "num-traits", "paste", "seq-macro", diff --git a/Cargo.toml b/Cargo.toml index e3c3cbae67e..cc5201a0ff7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ dirs = "6.0.0" divan = { package = "codspeed-divan-compat", version = "4.0.4" } enum-iterator = "2.0.0" env_logger = "0.11" -fastlanes = "0.5" +fastlanes = "0.5.1" flatbuffers = "25.2.10" fsst-rs = "0.5.11" futures = { version = "0.3.31", default-features = false } diff --git a/encodings/fastlanes/src/bitpacking/array/unpack_iter.rs b/encodings/fastlanes/src/bitpacking/array/unpack_iter.rs index 5e0673186e3..c9158d8bdac 100644 --- a/encodings/fastlanes/src/bitpacking/array/unpack_iter.rs +++ b/encodings/fastlanes/src/bitpacking/array/unpack_iter.rs @@ -259,6 +259,34 @@ impl> UnpackedChunks { debug_assert_eq!(local_idx, self.len); } + /// Walk every *packed* chunk in array order, yielding the raw packed FastLanes block and the + /// padded bit range it covers, without unpacking it. + /// + /// Unlike [`Self::for_each_unpacked_chunk`], this does not fill the scratch buffer: it hands + /// the still-packed block to the callback so fused kernels (e.g. compare) can unpack and + /// consume it in a single pass. Each yielded block holds exactly `elems_per_chunk` packed + /// values (the buffer is zero-padded out to a whole final chunk). + /// + /// The yielded range is in *padded* coordinates: block `c` covers + /// `[c * 1024, min((c + 1) * 1024, offset + len))`, so it includes the leading `offset` rows + /// that slicing skips. Block starts are therefore always 1024-aligned regardless of `offset`. + /// Callers must account for the array's `offset` when mapping a block's rows back to logical + /// output positions (e.g. by viewing the output buffer at a bit offset of `offset`). + pub(crate) fn for_each_packed_chunk(&self, mut f: F) + where + F: FnMut(&[T::Physical], Range), + { + let packed_slice: &[T::Physical] = buffer_as_slice(&self.packed); + let elems_per_chunk = self.elems_per_chunk(); + let padded_len = self.offset + self.len; + for chunk in 0..self.num_chunks { + let packed_chunk = &packed_slice[chunk * elems_per_chunk..][..elems_per_chunk]; + let start = chunk * CHUNK_SIZE; + let end = (start + CHUNK_SIZE).min(padded_len); + f(packed_chunk, start..end); + } + } + /// Unpack full chunks into output range starting at the given index. fn decode_full_chunks_into_at( &mut self, diff --git a/encodings/fastlanes/src/bitpacking/compute/compare.rs b/encodings/fastlanes/src/bitpacking/compute/compare.rs index f5c5c81c5cb..4fbc4cf8b9d 100644 --- a/encodings/fastlanes/src/bitpacking/compute/compare.rs +++ b/encodings/fastlanes/src/bitpacking/compute/compare.rs @@ -8,11 +8,15 @@ //! a [`BitBuffer`]. Patches are re-applied at the end by overwriting bits at the patched //! indices with `predicate(patch_value)`. +use fastlanes::BitPacking; +use fastlanes::BitPackingCompare; +use fastlanes::FastLanesComparable; use vortex_array::ArrayRef; use vortex_array::ArrayView; use vortex_array::ExecutionCtx; use vortex_array::dtype::NativePType; use vortex_array::dtype::Nullability; +use vortex_array::dtype::PhysicalPType; use vortex_array::match_each_integer_ptype; use vortex_array::scalar_fn::fns::binary::CompareKernel; use vortex_array::scalar_fn::fns::operators::CompareOperator; @@ -20,7 +24,8 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use crate::BitPacked; -use crate::bitpacking::compute::stream_predicate::stream_predicate; +use crate::bitpacking::compute::compare_fused::stream_compare_fused; +use crate::unpack_iter::BitPacked as BitPackedIter; impl CompareKernel for BitPacked { fn compare( @@ -55,6 +60,10 @@ impl CompareKernel for BitPacked { } } +/// Compare every value against the constant via the fused FastLanes `unpack_cmp` kernel. +/// +/// `NativePType::is_eq` / `is_lt` etc. provide total comparison (matching the primitive between +/// kernel's dispatch shape). `NotEq` has no direct method, so use `!is_eq`. fn compare_constant_typed( lhs: ArrayView<'_, BitPacked>, rhs: T, @@ -63,19 +72,30 @@ fn compare_constant_typed( ctx: &mut ExecutionCtx, ) -> VortexResult where - T: NativePType + Copy + crate::unpack_iter::BitPacked, + T: NativePType + + BitPackedIter + + FastLanesComparable::Physical>, + ::Physical: BitPacking + NativePType + BitPackingCompare, { - // `NativePType::is_eq` / `is_lt` etc. provide total comparison (matching the primitive - // between kernel's dispatch shape). `NotEq` has no direct method, so use `!is_eq`. match operator { - CompareOperator::Eq => stream_predicate::(lhs, nullability, |v| v.is_eq(rhs), ctx), + CompareOperator::Eq => { + stream_compare_fused::(lhs, rhs, nullability, |a, b| a.is_eq(b), ctx) + } CompareOperator::NotEq => { - stream_predicate::(lhs, nullability, |v| !v.is_eq(rhs), ctx) + stream_compare_fused::(lhs, rhs, nullability, |a, b| !a.is_eq(b), ctx) + } + CompareOperator::Lt => { + stream_compare_fused::(lhs, rhs, nullability, |a, b| a.is_lt(b), ctx) + } + CompareOperator::Lte => { + stream_compare_fused::(lhs, rhs, nullability, |a, b| a.is_le(b), ctx) + } + CompareOperator::Gt => { + stream_compare_fused::(lhs, rhs, nullability, |a, b| a.is_gt(b), ctx) + } + CompareOperator::Gte => { + stream_compare_fused::(lhs, rhs, nullability, |a, b| a.is_ge(b), ctx) } - CompareOperator::Lt => stream_predicate::(lhs, nullability, |v| v.is_lt(rhs), ctx), - CompareOperator::Lte => stream_predicate::(lhs, nullability, |v| v.is_le(rhs), ctx), - CompareOperator::Gt => stream_predicate::(lhs, nullability, |v| v.is_gt(rhs), ctx), - CompareOperator::Gte => stream_predicate::(lhs, nullability, |v| v.is_ge(rhs), ctx), } } @@ -191,6 +211,78 @@ mod tests { Ok(()) } + /// Sliced inputs: a non-zero block offset (and a length spanning several blocks) must still go + /// through the fused kernel and agree with the primitive fallback. Sweeps slice starts that + /// land both inside the first block and past it, with lengths that end mid-block and on a block + /// boundary. + #[rstest] + #[case(1, 4000)] // start mid-first-block, multi-block length + #[case(1023, 2)] // start at the last row of the first block + #[case(1024, 1024)] // start exactly on a block boundary, exactly one block long + #[case(1500, 1000)] // start mid-second-block + #[case(3, 1021)] // ends exactly on the first block boundary + fn sliced_matches_primitive( + #[case] start: usize, + #[case] slice_len: usize, + ) -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: Vec = (0..5000u32).map(|i| i % 128).collect(); + let prim = PrimitiveArray::from_iter(values); + let packed = BitPackedData::encode(&prim.clone().into_array(), 7, &mut ctx)?; + + let sliced = packed.into_array().slice(start..start + slice_len)?; + let rhs = ConstantArray::new(50u32, slice_len).into_array(); + for op in [ + CompareOperator::Eq, + CompareOperator::Lt, + CompareOperator::Gte, + ] { + let got = ::compare( + sliced.as_::(), + &rhs, + op, + &mut ctx, + )? + .expect("fused compare kernel must engage for sliced arrays") + .execute::(&mut ctx)?; + let want = prim + .clone() + .into_array() + .slice(start..start + slice_len)? + .binary(rhs.clone(), Operator::from(op))? + .execute::(&mut ctx)?; + assert_arrays_eq!(got, want); + } + Ok(()) + } + + /// Sliced *and* patched: combine a non-zero offset with out-of-range values that land in + /// `Patches`, exercising the `offset + (global - p_off)` patch-position math. + #[test] + fn sliced_with_patches_matches_primitive() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let values: Vec = (0..4096) + .map(|i| if i % 91 == 0 { 100_000 + i } else { i % 100 }) + .collect(); + let prim = PrimitiveArray::from_iter(values); + let packed = BitPackedData::encode(&prim.clone().into_array(), 7, &mut ctx)?; + assert!(packed.patches().is_some(), "test setup expects patches"); + + let (start, end) = (700usize, 3500usize); + let sliced = packed.into_array().slice(start..end)?; + let rhs = ConstantArray::new(50i32, end - start).into_array(); + let got = sliced + .binary(rhs.clone(), Operator::Eq)? + .execute::(&mut ctx)?; + let want = prim + .into_array() + .slice(start..end)? + .binary(rhs, Operator::Eq)? + .execute::(&mut ctx)?; + assert_arrays_eq!(got, want); + Ok(()) + } + /// Nullable input — the result must carry the array's validity. #[test] fn nullable_propagates_validity() -> VortexResult<()> { diff --git a/encodings/fastlanes/src/bitpacking/compute/compare_fused.rs b/encodings/fastlanes/src/bitpacking/compute/compare_fused.rs new file mode 100644 index 00000000000..5caba4118e7 --- /dev/null +++ b/encodings/fastlanes/src/bitpacking/compute/compare_fused.rs @@ -0,0 +1,177 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Fused compare kernel for [`BitPackedArray`] against a constant. +//! +//! Where [`super::stream_predicate`] unpacks a full 1024-element FastLanes block into a scratch +//! buffer and *then* folds a predicate over it, this path hands the comparison down into the +//! FastLanes [`BitPackingCompare::unchecked_unpack_cmp`] kernel, which compares each value against +//! the constant *as it is unpacked*, accumulating the boolean results straight into a 1024-bit +//! mask (`[u64; 16]`) in transposed FastLanes lane order - one register-resident word per lane, no +//! `[bool; 1024]` or `[T; 1024]` scratch. A single SIMD [`untranspose_bits`] per block then rotates +//! that mask into logical row order. +//! +//! The packed blocks are walked through the regular [`crate::unpack_iter::BitUnpackedChunks`] +//! iterator (via [`crate::unpack_iter::BitUnpackedChunks::for_each_packed_chunk`]) rather than a +//! bespoke chunk loop, so chunk sizing and bounds live in one place. +//! +//! Slicing is handled by working in *padded* coordinates: bit `offset + i` holds element `i`. The +//! output buffer is over-allocated to whole 1024-bit blocks, so every block - the sliced first +//! block, the body, and the trailing partial - untransposes straight into a 64-bit-word-aligned +//! slot with no per-block temporary and only one shared scratch `[u64; 16]`. The leading `offset` +//! garbage rows and the trailing padding are trimmed afterwards: leading whole `u64` words are +//! dropped with [`BufferMut::split_off`] (an O(1) re-slice) and the residual `offset % 64` bits plus +//! `len` form the final [`BitBufferMut`] view. The result is therefore byte-aligned (offset 0) when +//! `offset % 8 == 0`. Inline patches are spliced in afterwards by overwriting the bits at the +//! patched indices with `cmp(patch_value, rhs)`. + +use fastlanes::BitPacking; +use fastlanes::BitPackingCompare; +use fastlanes::FastLanesComparable; +use fastlanes::untranspose_bits; +use num_traits::AsPrimitive; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::BoolArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::dtype::NativePType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PhysicalPType; +use vortex_array::match_each_unsigned_integer_ptype; +use vortex_buffer::BitBufferMut; +use vortex_buffer::BufferMut; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; + +use super::stream_predicate::stream_predicate; +use crate::BitPacked; +use crate::BitPackedArrayExt; +use crate::unpack_iter::BitPacked as BitPackedIter; + +const CHUNK_SIZE: usize = 1024; +/// `u64` words spanning one FastLanes block (1024 bits / 64). +const WORDS_PER_CHUNK: usize = CHUNK_SIZE / u64::BITS as usize; + +/// Unpack one packed FastLanes block, comparing each value against `rhs` *as it is unpacked*, and +/// write the resulting 1024-bit mask (logical row order, LSB-first) into `out`. +/// +/// `transposed` is caller-owned scratch reused across every block, so the hot loop makes no +/// per-call stack allocation; its prior contents are irrelevant (the kernel overwrites it). +#[inline] +fn unpack_cmp_block( + transposed: &mut [u64; WORDS_PER_CHUNK], + out: &mut [u64; WORDS_PER_CHUNK], + bit_width: usize, + packed_chunk: &[::Physical], + cmp: F, + rhs: T, +) where + T: NativePType + + PhysicalPType + + FastLanesComparable::Physical>, + ::Physical: BitPacking + BitPackingCompare, + F: Fn(T, T) -> bool, +{ + transposed.fill(0); + // SAFETY: `packed_chunk` holds exactly `128 * bit_width / size_of::()` packed elements and + // `bit_width <= U::T`, satisfying `unchecked_unpack_cmp`'s contract. + unsafe { + <::Physical as BitPackingCompare>::unchecked_unpack_cmp::( + bit_width, + packed_chunk, + transposed, + cmp, + rhs, + ); + } + untranspose_bits::<::Physical>(transposed, out); +} + +/// Compare the unpacked values of a [`BitPackedArray`] against `rhs` using the fused FastLanes +/// `unpack_cmp` kernel, producing a [`BoolArray`]. +/// +/// `cmp(value, rhs)` defines the predicate; it must be the total-order comparison matching the +/// requested operator (e.g. `|a, b| a.is_lt(b)`). +pub(super) fn stream_compare_fused( + array: ArrayView<'_, BitPacked>, + rhs: T, + nullability: Nullability, + cmp: F, + ctx: &mut ExecutionCtx, +) -> VortexResult +where + T: NativePType + + BitPackedIter + + FastLanesComparable::Physical>, + ::Physical: BitPacking + NativePType + BitPackingCompare, + F: Fn(T, T) -> bool + Copy, +{ + let len = array.len(); + let bit_width = array.bit_width() as usize; + let offset = array.offset() as usize; + + // A degenerate width has no packed payload for the fused kernel to consume; defer to the scalar + // streaming predicate, which handles every layout (including the empty array). + if len == 0 || bit_width == 0 { + return stream_predicate::(array, nullability, move |v| cmp(v, rhs), ctx); + } + + // Over-allocate to whole 1024-bit blocks in padded coordinates so every block - including the + // trailing partial - has room for a full untranspose at a 64-bit-word-aligned offset. + let num_chunks = (offset + len).div_ceil(CHUNK_SIZE); + let mut words: BufferMut = BufferMut::zeroed(num_chunks * WORDS_PER_CHUNK); + + let chunks = array.unpacked_chunks::()?; + { + let words = words.as_mut_slice(); + let mut transposed = [0u64; WORDS_PER_CHUNK]; + chunks.for_each_packed_chunk(|packed_chunk, range| { + // Block starts are always 1024-aligned (padded coords), so the slot is a full block. + let out = words[range.start / u64::BITS as usize..] + .first_chunk_mut::() + .vortex_expect("over-allocated buffer holds a full block per chunk"); + unpack_cmp_block::(&mut transposed, out, bit_width, packed_chunk, cmp, rhs); + }); + + // Patched indices hold placeholder packed values, so their fused result is meaningless; + // overwrite each with the comparison against the real patch value. Patch positions are + // logical (`global - p_off`); shift by `offset` into padded coordinates. + if let Some(p) = array.patches() { + let p_idx = p.indices().clone().execute::(ctx)?; + let p_val = p.values().clone().execute::(ctx)?; + let p_off = p.offset(); + match_each_unsigned_integer_ptype!(p_idx.ptype(), |I| { + let indices = p_idx.as_slice::(); + let values = p_val.as_slice::(); + for (&global, &value) in indices.iter().zip(values) { + let global: usize = global.as_(); + set_bit(words, offset + global - p_off, cmp(value, rhs)); + } + }); + } + } + + // Trim the leading garbage: drop whole `u64` words covered entirely by the skipped `offset` + // region (an O(1) re-slice; `u64` granularity keeps the byte buffer 8-aligned). The residual + // `offset % 64` bits become the view offset, so the result is byte-aligned when `offset % 8 == 0` + // and offset 0 when `offset % 64 == 0`. + let head_words = offset / u64::BITS as usize; + let bit_offset = offset % u64::BITS as usize; + let bytes = words.split_off(head_words).into_byte_buffer(); + let bits = BitBufferMut::from_buffer(bytes, bit_offset, len); + let validity = array.validity()?.union_nullability(nullability); + Ok(BoolArray::new(bits.freeze(), validity).into_array()) +} + +/// Branchlessly write a single bit in a packed `u64` word buffer: clear the bit, then OR in the +/// new value. Avoids a data-dependent branch per patch in the patch-fixup loop, and touches the +/// target word through a single bounds-checked `&mut`. +#[inline] +fn set_bit(words: &mut [u64], idx: usize, value: bool) { + let shift = idx % u64::BITS as usize; + let mask = 1u64 << shift; + let word = &mut words[idx / u64::BITS as usize]; + *word = (*word & !mask) | (u64::from(value) << shift); +} diff --git a/encodings/fastlanes/src/bitpacking/compute/mod.rs b/encodings/fastlanes/src/bitpacking/compute/mod.rs index 518f8319eb1..06a4b4597b0 100644 --- a/encodings/fastlanes/src/bitpacking/compute/mod.rs +++ b/encodings/fastlanes/src/bitpacking/compute/mod.rs @@ -4,6 +4,7 @@ mod between; mod cast; mod compare; +mod compare_fused; mod filter; pub(crate) mod is_constant; mod slice;