diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 317f0869dac..2345e4d890b 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -3368,6 +3368,10 @@ pub struct vortex_array::arrays::patched::Patched impl vortex_array::arrays::patched::Patched +pub const vortex_array::arrays::patched::Patched::ID: vortex_array::ArrayId + +impl vortex_array::arrays::patched::Patched + pub fn vortex_array::arrays::patched::Patched::from_array_and_patches(inner: vortex_array::ArrayRef, patches: &vortex_array::patches::Patches, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> impl core::clone::Clone for vortex_array::arrays::patched::Patched @@ -3584,6 +3588,8 @@ pub fn T::patch_values(&self) -> &vortex_array::ArrayRef pub fn T::slots_view(&self) -> vortex_array::arrays::patched::PatchedSlotsView<'_> +pub fn vortex_array::arrays::patched::apply_patches_primitive(output: &mut [V], offset: usize, len: usize, n_lanes: usize, lane_offsets: &[u32], indices: &[u16], values: &[V]) + pub type vortex_array::arrays::patched::PatchedArray = vortex_array::Array pub mod vortex_array::arrays::primitive @@ -6218,6 +6224,10 @@ pub struct vortex_array::arrays::Patched impl vortex_array::arrays::patched::Patched +pub const vortex_array::arrays::patched::Patched::ID: vortex_array::ArrayId + +impl vortex_array::arrays::patched::Patched + pub fn vortex_array::arrays::patched::Patched::from_array_and_patches(inner: vortex_array::ArrayRef, patches: &vortex_array::patches::Patches, ctx: &mut vortex_array::ExecutionCtx) -> vortex_error::VortexResult> impl core::clone::Clone for vortex_array::arrays::patched::Patched diff --git a/vortex-array/src/arrays/patched/vtable/mod.rs b/vortex-array/src/arrays/patched/vtable/mod.rs index c49e681a411..83930c9e741 100644 --- a/vortex-array/src/arrays/patched/vtable/mod.rs +++ b/vortex-array/src/arrays/patched/vtable/mod.rs @@ -57,6 +57,11 @@ pub type PatchedArray = Array; #[derive(Clone, Debug)] pub struct Patched; +impl Patched { + /// The array ID for Patched arrays. + pub const ID: ArrayId = ArrayId::new_ref("vortex.patched"); +} + impl ValidityChild for Patched { fn validity_child(array: ArrayView<'_, Patched>) -> ArrayRef { array.inner().clone() @@ -99,7 +104,7 @@ impl VTable for Patched { type ValidityVTable = ValidityVTableFromChild; fn id(&self) -> ArrayId { - ArrayId::new_ref("vortex.patched") + Self::ID } fn validate( @@ -318,7 +323,12 @@ impl VTable for Patched { } /// Apply patches on top of the existing value types. -fn apply_patches_primitive( +/// +/// This function is used to overwrite values in the output buffer with patch values +/// at the specified indices. It handles the chunked layout where patches are organized +/// by lanes within 1024-element chunks. +#[allow(clippy::too_many_arguments)] +pub fn apply_patches_primitive( output: &mut [V], offset: usize, len: usize, diff --git a/vortex-cuda/kernels/src/patched.cu b/vortex-cuda/kernels/src/patched.cu new file mode 100644 index 00000000000..261977d12a4 --- /dev/null +++ b/vortex-cuda/kernels/src/patched.cu @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "types.cuh" + +/// Apply patches to an output array using the transposed Patched array format. +/// +/// This kernel uses a thread-per-lane model where each thread is assigned to +/// one (chunk, lane) slot and applies all patches in that slot. +template +__device__ void patched(ValueT *const output, + const uint32_t *const lane_offsets, + const uint16_t *const patch_indices, + const ValueT *const patch_values, + uint32_t n_lanes, + uint32_t total_lane_slots, + uint64_t offset, + uint64_t len) { + const uint32_t lane_slot = blockIdx.x * blockDim.x + threadIdx.x; + + // Early return if this thread is beyond the number of lane slots + if (lane_slot >= total_lane_slots) { + return; + } + + // Determine which chunk this lane slot belongs to + const uint32_t chunk = lane_slot / n_lanes; + + // Get the range of patches for this lane slot + const uint32_t start = lane_offsets[lane_slot]; + const uint32_t stop = lane_offsets[lane_slot + 1]; + + // Apply all patches in this lane + for (uint32_t p = start; p < stop; p++) { + // Get within-chunk index and compute global position + const uint16_t within_chunk_idx = patch_indices[p]; + const uint64_t global_idx = static_cast(chunk) * 1024 + within_chunk_idx; + + // Check bounds (for sliced arrays) + if (global_idx < offset) { + continue; + } + + if (global_idx >= offset + len) { + break; + } + + output[global_idx - offset] = patch_values[p]; + } +} + +#define GENERATE_PATCHED_KERNEL(value_suffix, ValueT) \ + extern "C" __global__ void patched_##value_suffix(ValueT *const output, \ + const uint32_t *const lane_offsets, \ + const uint16_t *const patch_indices, \ + const ValueT *const patch_values, \ + uint32_t n_lanes, \ + uint32_t total_lane_slots, \ + uint64_t offset, \ + uint64_t len) { \ + patched(output, lane_offsets, patch_indices, patch_values, n_lanes, total_lane_slots, offset, len); \ + } + +// Generate for all native SIMD ptypes +FOR_EACH_NATIVE_SIMD_PTYPE(GENERATE_PATCHED_KERNEL) diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index 357d444fae4..2974cdb6794 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -23,12 +23,14 @@ use vortex::encodings::fastlanes::unpack_iter::BitPacked as BitPackedUnpack; use vortex::error::VortexResult; use vortex::error::vortex_ensure; use vortex::error::vortex_err; +use vortex_error::vortex_bail; use crate::CudaBufferExt; use crate::CudaDeviceBuffer; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; use crate::kernel::patches::gpu::GPUPatches; +use crate::kernel::patches::types::DevicePatches; use crate::kernel::patches::types::transpose_patches; /// CUDA decoder for bit-packed arrays. @@ -53,7 +55,7 @@ impl CudaExecute for BitPackedExecutor { Self::try_specialize(array).ok_or_else(|| vortex_err!("Expected BitPackedArray"))?; match_each_integer_ptype!(array.ptype(array.dtype()), |A| { - decode_bitpacked::(array, A::default(), ctx).await + decode_bitpacked::(array, A::default(), None, ctx).await }) } } @@ -90,6 +92,7 @@ unsafe impl DeviceRepr for GPUPatches {} pub(crate) async fn decode_bitpacked( array: BitPackedArray, reference: A, + device_patches: Option, ctx: &mut CudaExecutionCtx, ) -> VortexResult where @@ -101,7 +104,7 @@ where bit_width, len, packed, - patches, + patches: interior_patches, validity, } = BitPacked::into_parts(array); @@ -122,11 +125,14 @@ where let cuda_function = bitpacked_cuda_kernel(bit_width, output_width, ctx)?; let config = bitpacked_cuda_launch_config(output_width, len)?; - // We hold this here to keep the device buffers alive. - let device_patches = if let Some(patches) = patches { - Some(transpose_patches(&patches, ctx).await?) - } else { - None + // Execute the patch kind to get device patches + let device_patches = match (interior_patches, device_patches) { + (None, None) => None, + (Some(patches), None) => Some(transpose_patches(&patches, ctx).await?), + (None, Some(device_patches)) => Some(device_patches), + (Some(_), Some(_)) => { + vortex_bail!("Cannot execute bitpacked array with interior and exterior patches") + } }; let patches_arg = if let Some(p) = &device_patches { diff --git a/vortex-cuda/src/kernel/encodings/for_.rs b/vortex-cuda/src/kernel/encodings/for_.rs index 490b797b9b8..bcda82d2951 100644 --- a/vortex-cuda/src/kernel/encodings/for_.rs +++ b/vortex-cuda/src/kernel/encodings/for_.rs @@ -56,7 +56,7 @@ impl CudaExecute for FoRExecutor { if let Some(bitpacked) = array.encoded().as_opt::() { match_each_integer_ptype!(bitpacked.ptype(bitpacked.dtype()), |P| { let reference: P = array.reference_scalar().try_into()?; - return decode_bitpacked(bitpacked.into_owned(), reference, ctx).await; + return decode_bitpacked(bitpacked.into_owned(), reference, None, ctx).await; }) } @@ -67,7 +67,7 @@ impl CudaExecute for FoRExecutor { let slice_range = slice_array.slice_range().clone(); let unpacked = match_each_integer_ptype!(bitpacked.ptype(bitpacked.dtype()), |P| { let reference: P = array.reference_scalar().try_into()?; - decode_bitpacked(bitpacked.into_owned(), reference, ctx).await? + decode_bitpacked(bitpacked.into_owned(), reference, None, ctx).await? }); return unpacked diff --git a/vortex-cuda/src/kernel/encodings/mod.rs b/vortex-cuda/src/kernel/encodings/mod.rs index 62a8d9f606d..5721b87af58 100644 --- a/vortex-cuda/src/kernel/encodings/mod.rs +++ b/vortex-cuda/src/kernel/encodings/mod.rs @@ -6,6 +6,7 @@ mod bitpacked; mod date_time_parts; mod decimal_byte_parts; mod for_; +mod patched; mod runend; mod sequence; mod zigzag; @@ -18,6 +19,7 @@ pub(crate) use bitpacked::BitPackedExecutor; pub(crate) use date_time_parts::DateTimePartsExecutor; pub(crate) use decimal_byte_parts::DecimalBytePartsExecutor; pub(crate) use for_::FoRExecutor; +pub(crate) use patched::PatchedExecutor; pub(crate) use runend::RunEndExecutor; pub(crate) use sequence::SequenceExecutor; pub(crate) use zigzag::ZigZagExecutor; diff --git a/vortex-cuda/src/kernel/encodings/patched.rs b/vortex-cuda/src/kernel/encodings/patched.rs new file mode 100644 index 00000000000..e3cd74988b0 --- /dev/null +++ b/vortex-cuda/src/kernel/encodings/patched.rs @@ -0,0 +1,498 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Debug; + +use async_trait::async_trait; +use cudarc::driver::DeviceRepr; +use cudarc::driver::PushKernelArg; +use tracing::instrument; +use vortex::array::ArrayRef; +use vortex::array::Canonical; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::primitive::PrimitiveDataParts; +use vortex::array::match_each_integer_ptype; +use vortex::dtype::NativePType; +use vortex::encodings::fastlanes::BitPacked; +use vortex::encodings::fastlanes::BitPackedArrayExt; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex_array::arrays::PatchedArray; +use vortex_array::arrays::patched::Patched; +use vortex_array::arrays::patched::PatchedArrayExt; +use vortex_array::arrays::patched::PatchedArraySlotsExt; +use vortex_array::buffer::BufferHandle; +use vortex_array::match_each_native_simd_ptype; + +use crate::CudaBufferExt; +use crate::executor::CudaArrayExt; +use crate::executor::CudaExecute; +use crate::executor::CudaExecutionCtx; +use crate::kernel::encodings::bitpacked::decode_bitpacked; +use crate::kernel::patches::types::DevicePatches; + +/// CUDA decoder for Patched arrays. +/// +/// When the inner child is BitPacked, fuses patching with bit-unpacking to avoid +/// an additional kernel dispatch. +#[derive(Debug)] +pub(crate) struct PatchedExecutor; + +impl PatchedExecutor { + fn try_specialize(array: ArrayRef) -> Option { + array.try_downcast::().ok() + } +} + +#[async_trait] +impl CudaExecute for PatchedExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] + async fn execute( + &self, + array: ArrayRef, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let array = + Self::try_specialize(array).ok_or_else(|| vortex_err!("Expected PatchedArray"))?; + + // Check if the inner child is BitPacked - if so, we can fuse patching with unpacking + if let Some(bitpacked) = array.inner().as_opt::() { + // The inner BitPacked should not have its own interior patches since they've + // been externalized into the Patched wrapper + if bitpacked.patches().is_some() { + return Err(vortex_err!( + "Patched(BitPacked) should not have interior patches in BitPacked child" + )); + } + + // Execute the components + let lane_offsets = array + .lane_offsets() + .clone() + .execute_cuda(ctx) + .await? + .into_primitive() + .into_data_parts() + .buffer; + + let patch_indices = array + .patch_indices() + .clone() + .execute_cuda(ctx) + .await? + .into_primitive() + .into_data_parts() + .buffer; + + let patch_values = array + .patch_values() + .clone() + .execute_cuda(ctx) + .await? + .into_primitive() + .into_data_parts() + .buffer; + + match_each_integer_ptype!(bitpacked.ptype(bitpacked.dtype()), |P| { + return decode_bitpacked::

( + bitpacked.into_owned(), + P::default(), + Some(DevicePatches { + lane_offsets: ctx.ensure_on_device(lane_offsets).await?, + indices: ctx.ensure_on_device(patch_indices).await?, + values: ctx.ensure_on_device(patch_values).await?, + }), + ctx, + ) + .await; + }) + } + + // Fallback: execute inner on GPU, then apply patches using GPU kernel + let n_lanes = array.n_lanes(); + let offset = array.offset(); + let len = array.as_ref().len(); + + // Execute inner on GPU to get the base values + let inner_canonical = array.inner().clone().execute_cuda(ctx).await?; + let inner_primitive = inner_canonical.into_primitive(); + let validity = inner_primitive.validity()?.clone(); + let ptype = inner_primitive.ptype(); + + // Get the inner buffer on device + let PrimitiveDataParts { buffer, .. } = inner_primitive.into_data_parts(); + let d_output = ctx.ensure_on_device(buffer).await?; + + // Execute patch components on GPU + let lane_offsets = array.lane_offsets().clone().execute_cuda(ctx).await?; + let lane_offsets_prim = lane_offsets.into_primitive(); + + // one thread per lane, i.e. lane_offsets.len() - 1 + let n_threads = lane_offsets_prim.len().saturating_sub(1); + + let PrimitiveDataParts { + buffer: lane_offsets_buffer, + .. + } = lane_offsets_prim.into_data_parts(); + let d_lane_offsets = ctx.ensure_on_device(lane_offsets_buffer).await?; + + let patch_indices = array.patch_indices().clone().execute_cuda(ctx).await?; + let patch_indices_prim = patch_indices.into_primitive(); + let PrimitiveDataParts { + buffer: patch_indices_buffer, + .. + } = patch_indices_prim.into_data_parts(); + let d_patch_indices = ctx.ensure_on_device(patch_indices_buffer).await?; + + let patch_values = array.patch_values().clone().execute_cuda(ctx).await?; + let patch_values_prim = patch_values.into_primitive(); + let PrimitiveDataParts { + buffer: patch_values_buffer, + .. + } = patch_values_prim.into_data_parts(); + let d_patch_values = ctx.ensure_on_device(patch_values_buffer).await?; + + // Apply patches on GPU using thread-per-lane model + match_each_native_simd_ptype!(ptype, |V| { + let patched_buffer = execute_patched::( + d_output, + d_lane_offsets, + d_patch_indices, + d_patch_values, + n_threads, + n_lanes, + offset, + len, + ctx, + )?; + + Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( + patched_buffer, + ptype, + validity, + ))) + }) + } +} + +/// Apply patches to an output buffer using the Patched array GPU kernel. +/// +/// Uses a thread-per-lane model where each thread handles one (chunk, lane) slot +/// and applies all patches in that slot. +/// +/// `n_threads` is the number of threads to execute the kernel with, which should +/// be equal to `lane_offsets.len() - 1`, i.e. one per lane. +#[instrument(skip_all)] +#[allow(clippy::too_many_arguments)] +fn execute_patched( + output: BufferHandle, + lane_offsets: BufferHandle, + patch_indices: BufferHandle, + patch_values: BufferHandle, + n_threads: usize, + n_lanes: usize, + offset: usize, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + if n_threads == 0 { + // No lanes to process + return Ok(output); + } + + let d_output_view = output.cuda_view::()?; + let d_lane_offsets_view = lane_offsets.cuda_view::()?; + let d_patch_indices_view = patch_indices.cuda_view::()?; + let d_patch_values_view = patch_values.cuda_view::()?; + + let n_lanes_u32 = u32::try_from(n_lanes)?; + let total_lane_slots_u32 = u32::try_from(n_threads)?; + let offset_u64 = offset as u64; + let len_u64 = len as u64; + + let kernel_func = ctx.load_function("patched", &[T::PTYPE])?; + + // Launch with one thread per lane slot + ctx.launch_kernel(&kernel_func, n_threads, |args| { + args.arg(&d_output_view) + .arg(&d_lane_offsets_view) + .arg(&d_patch_indices_view) + .arg(&d_patch_values_view) + .arg(&n_lanes_u32) + .arg(&total_lane_slots_u32) + .arg(&offset_u64) + .arg(&len_u64); + })?; + + Ok(output) +} + +#[cfg(test)] +mod tests { + use futures::executor::block_on; + use vortex::array::IntoArray; + use vortex::array::arrays::PrimitiveArray; + use vortex::array::assert_arrays_eq; + use vortex::array::validity::Validity::NonNullable; + use vortex::buffer::Buffer; + use vortex::encodings::fastlanes::BitPacked; + use vortex::error::VortexExpect; + use vortex::error::VortexResult; + use vortex::session::VortexSession; + use vortex_array::ExecutionCtx; + use vortex_array::arrays::Patched; + use vortex_array::patches::Patches; + + use super::*; + use crate::CanonicalCudaExt; + use crate::session::CudaSession; + + #[crate::test] + fn test_patched_bitpacked() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + // Create a primitive array with values that all fit in 6 bits (0-63) + // We'll add patches for some positions manually + let mut values: Vec = (0u16..64).cycle().take(2048).collect(); + // Set the patch positions to filler values (0) + values[100] = 0; + values[500] = 0; + values[1000] = 0; + values[1500] = 0; + + let array = PrimitiveArray::new(Buffer::from(values), NonNullable); + + // Encode with 6 bits - all values fit, so no internal patches + let bp_array = BitPacked::encode(&array.into_array(), 6)?; + assert!(bp_array.patches().is_none()); + + // Create patches for the positions we zeroed out + let patches = Patches::new( + 2048, + 0, + PrimitiveArray::from_iter([100u32, 500, 1000, 1500]).into_array(), + PrimitiveArray::from_iter([1000u16, 2000, 3000, 4000]).into_array(), + None, + )?; + + let session = VortexSession::empty(); + let mut exec_ctx = ExecutionCtx::new(session); + let patched_array = + Patched::from_array_and_patches(bp_array.into_array(), &patches, &mut exec_ctx)?; + + let cpu_result = patched_array.to_canonical()?.into_array(); + + let gpu_result = block_on(async { + PatchedExecutor + .execute(patched_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + + #[crate::test] + fn test_patched_primitive_fallback() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + // Create a primitive array + let values = PrimitiveArray::new((0u16..1024).collect::>(), NonNullable); + + // Create patches for some values + let patches = Patches::new( + 1024, + 0, + PrimitiveArray::from_iter([100u32, 200, 300, 400]).into_array(), + PrimitiveArray::from_iter([9999u16, 8888, 7777, 6666]).into_array(), + None, + )?; + + let session = VortexSession::empty(); + let mut exec_ctx = ExecutionCtx::new(session); + let patched_array = + Patched::from_array_and_patches(values.into_array(), &patches, &mut exec_ctx)?; + + let cpu_result = patched_array.to_canonical()?.into_array(); + + // This should use the GPU kernel fallback since inner is Primitive, not BitPacked + let gpu_result = block_on(async { + PatchedExecutor + .execute(patched_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + + #[crate::test] + fn test_patched_bitpacked_sliced() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + // Create a primitive array with values that all fit in 6 bits (0-63) + let mut values: Vec = (0u16..64).cycle().take(2048).collect(); + // Set the patch positions to filler values (0) + values[100] = 0; + values[500] = 0; + values[1000] = 0; + values[1500] = 0; + + let array = PrimitiveArray::new(Buffer::from(values), NonNullable); + + // Encode with 6 bits - all values fit, so no internal patches + let bp_array = BitPacked::encode(&array.into_array(), 6)?; + assert!(bp_array.patches().is_none()); + + // Create patches for the positions we zeroed out + let patches = Patches::new( + 2048, + 0, + PrimitiveArray::from_iter([100u32, 500, 1000, 1500]).into_array(), + PrimitiveArray::from_iter([1000u16, 2000, 3000, 4000]).into_array(), + None, + )?; + + let session = VortexSession::empty(); + let mut exec_ctx = ExecutionCtx::new(session); + let patched_array = + Patched::from_array_and_patches(bp_array.into_array(), &patches, &mut exec_ctx)?; + + // Slice starting after the first patch but before the second + let sliced = patched_array.into_array().slice(200..1800)?; + + let cpu_result = sliced.to_canonical()?.into_array(); + + let gpu_result = block_on(async { + PatchedExecutor + .execute(sliced, &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + + #[crate::test] + fn test_patched_bitpacked_multi_chunk() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + // Create a large array spanning multiple FL chunks (each chunk is 1024 elements) + // 5 chunks = 5120 elements + let mut values: Vec = (0u32..256).cycle().take(5120).collect(); + // Set patch positions to filler values - one in each chunk + values[100] = 0; // chunk 0 + values[1200] = 0; // chunk 1 + values[2300] = 0; // chunk 2 + values[3400] = 0; // chunk 3 + values[4500] = 0; // chunk 4 + + let array = PrimitiveArray::new(Buffer::from(values), NonNullable); + + // Encode with 8 bits - all values fit (0-255), so no internal patches + let bp_array = BitPacked::encode(&array.into_array(), 8)?; + assert!(bp_array.patches().is_none()); + + // Create patches across multiple chunks + let patches = Patches::new( + 5120, + 0, + PrimitiveArray::from_iter([100u32, 1200, 2300, 3400, 4500]).into_array(), + PrimitiveArray::from_iter([10000u32, 20000, 30000, 40000, 50000]).into_array(), + None, + )?; + + let session = VortexSession::empty(); + let mut exec_ctx = ExecutionCtx::new(session); + let patched_array = + Patched::from_array_and_patches(bp_array.into_array(), &patches, &mut exec_ctx)?; + + let cpu_result = patched_array.to_canonical()?.into_array(); + + let gpu_result = block_on(async { + PatchedExecutor + .execute(patched_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + + #[crate::test] + fn test_patched_bitpacked_multi_chunk_sliced() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + // Create a large array spanning multiple FL chunks + let mut values: Vec = (0u32..256).cycle().take(4096).collect(); + // Set patch positions to filler values - one in each chunk + values[100] = 0; // chunk 0 + values[1200] = 0; // chunk 1 + values[2300] = 0; // chunk 2 + values[3400] = 0; // chunk 3 + + let array = PrimitiveArray::new(Buffer::from(values), NonNullable); + + // Encode with 8 bits + let bp_array = BitPacked::encode(&array.into_array(), 8)?; + assert!(bp_array.patches().is_none()); + + // Create patches across multiple chunks + let patches = Patches::new( + 4096, + 0, + PrimitiveArray::from_iter([100u32, 1200, 2300, 3400]).into_array(), + PrimitiveArray::from_iter([10000u32, 20000, 30000, 40000]).into_array(), + None, + )?; + + let session = VortexSession::empty(); + let mut exec_ctx = ExecutionCtx::new(session); + let patched_array = + Patched::from_array_and_patches(bp_array.into_array(), &patches, &mut exec_ctx)?; + + // Slice across chunk boundaries (from middle of chunk 1 to middle of chunk 3) + let sliced = patched_array.into_array().slice(1500..3000)?; + + let cpu_result = sliced.to_canonical()?.into_array(); + + let gpu_result = block_on(async { + PatchedExecutor + .execute(sliced, &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } +} diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index 274fe31b18f..6c659232c0b 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -39,6 +39,7 @@ use kernel::DictExecutor; use kernel::FilterExecutor; use kernel::FoRExecutor; pub use kernel::LaunchStrategy; +use kernel::PatchedExecutor; use kernel::RunEndExecutor; use kernel::SharedExecutor; pub use kernel::TracingLaunchStrategy; @@ -62,6 +63,7 @@ pub use stream_pool::VortexCudaStreamPool; use vortex::array::arrays::Constant; use vortex::array::arrays::Dict; use vortex::array::arrays::Filter; +use vortex::array::arrays::Patched; use vortex::array::arrays::Shared; use vortex::array::arrays::Slice; use vortex::encodings::alp::ALP; @@ -99,6 +101,7 @@ pub fn initialize_cuda(session: &CudaSession) { session.register_kernel(DateTimeParts::ID, &DateTimePartsExecutor); session.register_kernel(DecimalByteParts::ID, &DecimalBytePartsExecutor); session.register_kernel(Dict::ID, &DictExecutor); + session.register_kernel(Patched::ID, &PatchedExecutor); session.register_kernel(Shared::ID, &SharedExecutor); session.register_kernel(FoR::ID, &FoRExecutor); session.register_kernel(RunEnd::ID, &RunEndExecutor);