From 44fd21b19a70e5e5b1c0db5c1d49a3de1636de1e Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 7 Apr 2026 23:13:40 -0400 Subject: [PATCH 01/17] Initial buffer allocator Signed-off-by: Nicholas Gates --- .../src/arrays/chunked/vtable/canonical.rs | 99 +++++++++++++++++-- vortex-array/src/builders/mod.rs | 12 +++ vortex-array/src/executor.rs | 7 ++ vortex-array/src/lib.rs | 1 + vortex-datafusion/src/persistent/format.rs | 7 +- vortex-datafusion/src/persistent/reader.rs | 4 +- vortex-file/src/open.rs | 81 ++++++++++++++- vortex-io/src/object_store/read_at.rs | 39 ++++++-- vortex-io/src/std_file/read_at.rs | 26 ++++- 9 files changed, 247 insertions(+), 29 deletions(-) diff --git a/vortex-array/src/arrays/chunked/vtable/canonical.rs b/vortex-array/src/arrays/chunked/vtable/canonical.rs index 73a9b1c856a..a090b8b7928 100644 --- a/vortex-array/src/arrays/chunked/vtable/canonical.rs +++ b/vortex-array/src/arrays/chunked/vtable/canonical.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_buffer::BufferMut; +use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -19,12 +19,13 @@ use crate::arrays::chunked::ChunkedArrayExt; use crate::arrays::listview::ListViewArrayExt; use crate::arrays::listview::ListViewRebuildMode; use crate::arrays::struct_::StructArrayExt; -use crate::builders::builder_with_capacity; +use crate::builders::builder_with_capacity_in; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; use crate::dtype::StructFields; +use crate::memory::BufferAllocatorExt; use crate::validity::Validity; pub(super) fn _canonicalize( @@ -56,7 +57,7 @@ pub(super) fn _canonicalize( ctx, )?), _ => { - let mut builder = builder_with_capacity(array.dtype(), array.len()); + let mut builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len()); array.array().append_to_builder(builder.as_mut(), ctx)?; builder.finish_into_canonical() } @@ -131,8 +132,12 @@ fn swizzle_list_chunks( // this much more complicated. // We (somewhat arbitrarily) choose `u64` for our offsets and sizes here. These can always be // narrowed later by the compressor. - let mut offsets = BufferMut::::with_capacity(len); - let mut sizes = BufferMut::::with_capacity(len); + let allocator = ctx.allocator(); + let mut offsets = allocator.allocate_host_typed::(len)?; + let mut sizes = allocator.allocate_host_typed::(len)?; + let offsets_out: &mut [u64] = offsets.as_mut_slice_typed::()?; + let sizes_slice_out: &mut [u64] = sizes.as_mut_slice_typed::()?; + let mut next_list = 0usize; for chunk in chunks { let chunk_array = chunk.clone().execute::(ctx)?; @@ -162,19 +167,31 @@ fn swizzle_list_chunks( let sizes_slice = sizes_arr.as_slice::(); // Append offsets and sizes, adjusting offsets to point into the combined array. - offsets.extend(offsets_slice.iter().map(|o| o + num_elements)); - sizes.extend(sizes_slice); + for (&offset, &size) in offsets_slice.iter().zip(sizes_slice.iter()) { + offsets_out[next_list] = offset + num_elements; + sizes_slice_out[next_list] = size; + next_list += 1; + } num_elements += chunk_array.elements().len() as u64; } + debug_assert_eq!(next_list, len); // SAFETY: elements are sliced from valid `ListViewArray`s (from `to_listview()`). let chunked_elements = unsafe { ChunkedArray::new_unchecked(list_elements_chunks, elem_dtype.clone()) } .into_array(); - let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable).into_array(); - let sizes = PrimitiveArray::new(sizes.freeze(), Validity::NonNullable).into_array(); + let offsets = PrimitiveArray::new( + Buffer::::from_byte_buffer(offsets.freeze()), + Validity::NonNullable, + ) + .into_array(); + let sizes = PrimitiveArray::new( + Buffer::::from_byte_buffer(sizes.freeze()), + Validity::NonNullable, + ) + .into_array(); // SAFETY: // - `offsets` and `sizes` are non-nullable u64 arrays of the same length @@ -192,9 +209,15 @@ fn swizzle_list_chunks( #[cfg(test)] mod tests { use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + use crate::Canonical; + use crate::ExecutionCtx; use crate::IntoArray; use crate::ToCanonical; use crate::accessor::ArrayAccessor; @@ -207,8 +230,28 @@ mod tests { use crate::dtype::DType::Primitive; use crate::dtype::Nullability::NonNullable; use crate::dtype::PType::I32; + use crate::memory::BufferAllocator; + use crate::memory::DefaultBufferAllocator; + use crate::memory::MemorySessionExt; + use crate::memory::WritableHostBuffer; use crate::validity::Validity; + #[derive(Debug)] + struct CountingAllocator { + allocations: Arc, + } + + impl BufferAllocator for CountingAllocator { + fn allocate_host( + &self, + len: usize, + alignment: vortex_buffer::Alignment, + ) -> VortexResult { + self.allocations.fetch_add(1, Ordering::Relaxed); + DefaultBufferAllocator.allocate_host(len, alignment) + } + } + #[test] pub fn pack_nested_structs() { let struct_array = StructArray::try_new( @@ -265,4 +308,42 @@ mod tests { assert_eq!(l1.scalar_at(0).unwrap(), canon_values.scalar_at(0).unwrap()); assert_eq!(l2.scalar_at(0).unwrap(), canon_values.scalar_at(1).unwrap()); } + + #[test] + fn list_canonicalize_uses_memory_session_allocator() { + let allocations = Arc::new(AtomicUsize::new(0)); + let session = VortexSession::empty(); + session + .memory_mut() + .set_allocator(Arc::new(CountingAllocator { + allocations: Arc::clone(&allocations), + })); + let mut ctx = ExecutionCtx::new(session); + + let l1 = ListArray::try_new( + buffer![1, 2, 3, 4].into_array(), + buffer![0, 3].into_array(), + Validity::NonNullable, + ) + .unwrap(); + let l2 = ListArray::try_new( + buffer![5, 6].into_array(), + buffer![0, 2].into_array(), + Validity::NonNullable, + ) + .unwrap(); + + let chunked_list = ChunkedArray::try_new( + vec![l1.into_array(), l2.into_array()], + List(Arc::new(Primitive(I32, NonNullable)), NonNullable), + ) + .unwrap() + .into_array(); + + drop(chunked_list.execute::(&mut ctx).unwrap()); + assert!( + allocations.load(Ordering::Relaxed) >= 2, + "expected offset+size allocations through MemorySession" + ); + } } diff --git a/vortex-array/src/builders/mod.rs b/vortex-array/src/builders/mod.rs index f9914c8fb84..44edb4ebfa7 100644 --- a/vortex-array/src/builders/mod.rs +++ b/vortex-array/src/builders/mod.rs @@ -39,6 +39,7 @@ use crate::canonical::Canonical; use crate::dtype::DType; use crate::match_each_decimal_value_type; use crate::match_each_native_ptype; +use crate::memory::BufferAllocatorRef; use crate::scalar::Scalar; mod lazy_null_builder; @@ -285,3 +286,14 @@ pub fn builder_with_capacity(dtype: &DType, capacity: usize) -> Box Box { + let _allocator = allocator; + builder_with_capacity(dtype, capacity) +} diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 623410938d2..591c8e0c45c 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -34,6 +34,8 @@ use crate::ArrayRef; use crate::Canonical; use crate::IntoArray; use crate::matcher::Matcher; +use crate::memory::BufferAllocatorRef; +use crate::memory::MemorySessionExt; use crate::optimizer::ArrayOptimizer; /// Maximum number of iterations to attempt when executing an array before giving up and returning @@ -210,6 +212,11 @@ impl ExecutionCtx { &self.session } + /// Get the session-scoped host allocator for this execution context. + pub fn allocator(&self) -> BufferAllocatorRef { + self.session.allocator() + } + /// Log an execution step at the current depth. /// /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level. diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index cb0956b8f47..e84c83c0cf6 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -52,6 +52,7 @@ pub mod kernel; pub mod mask; mod mask_future; pub mod matcher; +pub mod memory; mod metadata; pub mod normalize; pub mod optimizer; diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 0be2086c885..9b16cfd85aa 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -43,6 +43,7 @@ use futures::stream; use object_store::ObjectMeta; use object_store::ObjectStore; use vortex::VortexSessionDefault; +use vortex::array::memory::MemorySessionExt; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::dtype::PType; @@ -265,10 +266,11 @@ impl FileFormat for VortexFormat { } // Not entry or invalid - open the file - let reader = Arc::new(ObjectStoreReadAt::new( + let reader = Arc::new(ObjectStoreReadAt::new_with_allocator( store, object.location.clone(), session.handle(), + session.allocator(), )); let vxf = session @@ -337,10 +339,11 @@ impl FileFormat for VortexFormat { Some(metadata) => metadata, None => { // Not entry - open the file - let reader = Arc::new(ObjectStoreReadAt::new( + let reader = Arc::new(ObjectStoreReadAt::new_with_allocator( store, object.location.clone(), session.handle(), + session.allocator(), )); let vxf = session diff --git a/vortex-datafusion/src/persistent/reader.rs b/vortex-datafusion/src/persistent/reader.rs index 4e480ca374f..8b87c29e8a2 100644 --- a/vortex-datafusion/src/persistent/reader.rs +++ b/vortex-datafusion/src/persistent/reader.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use datafusion_common::Result as DFResult; use datafusion_datasource::PartitionedFile; use object_store::ObjectStore; +use vortex::array::memory::MemorySessionExt; use vortex::io::VortexReadAt; use vortex::io::object_store::ObjectStoreReadAt; use vortex::io::session::RuntimeSessionExt; @@ -45,10 +46,11 @@ impl VortexReaderFactory for DefaultVortexReaderFactory { file: &PartitionedFile, session: &VortexSession, ) -> DFResult> { - Ok(Arc::new(ObjectStoreReadAt::new( + Ok(Arc::new(ObjectStoreReadAt::new_with_allocator( self.object_store.clone(), file.path().as_ref().into(), session.handle(), + session.allocator(), )) as _) } } diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 049010f0d71..54ecc74c32c 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use futures::executor::block_on; use parking_lot::RwLock; use vortex_array::dtype::DType; +use vortex_array::memory::MemorySessionExt; use vortex_array::session::ArraySessionExt; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; @@ -61,7 +62,9 @@ pub struct VortexOpenOptions { labels: Vec