diff --git a/vortex-layout/src/layouts/chunked/mod.rs b/vortex-layout/src/layouts/chunked/mod.rs index ee37cf87411..4e9dcd0a0ea 100644 --- a/vortex-layout/src/layouts/chunked/mod.rs +++ b/vortex-layout/src/layouts/chunked/mod.rs @@ -19,6 +19,7 @@ use crate::LayoutId; use crate::LayoutReaderContext; use crate::LayoutReaderRef; use crate::LayoutRef; +use crate::SharedReaderCache; use crate::VTable; use crate::children::LayoutChildren; use crate::children::OwnedLayoutChildren; @@ -77,12 +78,16 @@ impl VTable for Chunked { session: &VortexSession, ctx: &LayoutReaderContext, ) -> VortexResult { + let reader_cache = layout + .cache + .get_or_create(&segment_source, layout.nchildren()); Ok(Arc::new(ChunkedReader::new( layout.clone(), name, segment_source, session, ctx.clone(), + reader_cache, ))) } @@ -113,6 +118,7 @@ impl VTable for Chunked { layout.children = new_children; layout.chunk_offsets = chunk_offsets; + layout.cache = SharedReaderCache::new(); Ok(()) } } @@ -130,6 +136,7 @@ pub struct ChunkedLayout { dtype: DType, children: Arc, chunk_offsets: Vec, + cache: SharedReaderCache, } impl ChunkedLayout { @@ -149,6 +156,7 @@ impl ChunkedLayout { dtype, children, chunk_offsets, + cache: SharedReaderCache::new(), } } diff --git a/vortex-layout/src/layouts/chunked/reader.rs b/vortex-layout/src/layouts/chunked/reader.rs index 4d095cfee37..5a7e0d9a6aa 100644 --- a/vortex-layout/src/layouts/chunked/reader.rs +++ b/vortex-layout/src/layouts/chunked/reader.rs @@ -29,6 +29,7 @@ use vortex_session::VortexSession; use crate::LayoutReaderContext; use crate::LayoutReaderRef; use crate::LazyReaderChildren; +use crate::LazyReaderChildrenCache; use crate::layouts::chunked::ChunkedLayout; use crate::reader::LayoutReader; use crate::reader::RowSplits; @@ -51,6 +52,7 @@ impl ChunkedReader { segment_source: Arc, session: &VortexSession, ctx: LayoutReaderContext, + reader_cache: LazyReaderChildrenCache, ) -> Self { let nchildren = layout.nchildren(); let dtypes = vec![layout.dtype.clone(); nchildren]; @@ -72,6 +74,7 @@ impl ChunkedReader { segment_source, session.clone(), ctx, + reader_cache, ); Self { diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index 5261f85f9a8..cb33e4392df 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -42,6 +42,7 @@ use crate::ArrayFuture; use crate::LayoutReader; use crate::LayoutReaderRef; use crate::LazyReaderChildren; +use crate::LazyReaderChildrenCache; use crate::RowSplits; use crate::SplitRange; use crate::layouts::partitioned::PartitionedExprEval; @@ -94,6 +95,7 @@ impl StructReader { names.insert(0, Arc::from("validity")); } + let cache = LazyReaderChildrenCache::new(layout.nchildren()); let lazy_children = LazyReaderChildren::new( Arc::clone(&layout.children), dtypes, @@ -101,6 +103,7 @@ impl StructReader { Arc::clone(&segment_source), session.clone(), ctx, + cache, ); // Create an expanded root expression that contains all fields of the struct. diff --git a/vortex-layout/src/layouts/zoned/reader.rs b/vortex-layout/src/layouts/zoned/reader.rs index 6666be880ab..941dc77cf87 100644 --- a/vortex-layout/src/layouts/zoned/reader.rs +++ b/vortex-layout/src/layouts/zoned/reader.rs @@ -22,6 +22,7 @@ use vortex_session::VortexSession; use crate::LayoutReader; use crate::LayoutReaderRef; use crate::LazyReaderChildren; +use crate::LazyReaderChildrenCache; use crate::RowSplits; use crate::SplitRange; use crate::layouts::zoned::ZonedLayout; @@ -49,6 +50,7 @@ impl ZonedReader { stats_table_dtype(layout.dtype(), layout.present_stats()), ]; let names = vec![Arc::clone(&name), format!("{}.zones", name).into()]; + let cache = LazyReaderChildrenCache::new(layout.nchildren()); let lazy_children = Arc::new(LazyReaderChildren::new( Arc::clone(&layout.children), dtypes, @@ -56,6 +58,7 @@ impl ZonedReader { Arc::clone(&segment_source), session.clone(), ctx, + cache, )); Ok(Self { diff --git a/vortex-layout/src/reader.rs b/vortex-layout/src/reader.rs index c4f65c1c96c..4a018c21b0c 100644 --- a/vortex-layout/src/reader.rs +++ b/vortex-layout/src/reader.rs @@ -2,12 +2,17 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::any::Any; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::fmt::Result; use std::ops::Range; use std::sync::Arc; +use std::sync::Weak; use futures::future::BoxFuture; use futures::try_join; use once_cell::sync::OnceCell; +use parking_lot::Mutex; use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::MaskFuture; @@ -202,6 +207,20 @@ impl ArrayFutureExt for ArrayFuture { } } +/// Per child layout reader cache +#[derive(Clone)] +pub struct LazyReaderChildrenCache(Arc<[OnceCell]>); + +impl LazyReaderChildrenCache { + pub fn new(len: usize) -> Self { + Self(vec![OnceCell::new(); len].into_boxed_slice().into()) + } + + pub fn item(&self, idx: usize) -> &OnceCell { + &self.0[idx] + } +} + pub struct LazyReaderChildren { children: Arc, dtypes: Vec, @@ -209,11 +228,12 @@ pub struct LazyReaderChildren { segment_source: Arc, session: VortexSession, ctx: LayoutReaderContext, - // TODO(ngates): we may want a hash map of some sort here? - cache: Vec>, + cache: LazyReaderChildrenCache, } impl LazyReaderChildren { + /// If cache is supplied, caller must ensure it was created with same + /// "nchildren" and "segment_source" pub fn new( children: Arc, dtypes: Vec, @@ -221,9 +241,8 @@ impl LazyReaderChildren { segment_source: Arc, session: VortexSession, ctx: LayoutReaderContext, + cache: LazyReaderChildrenCache, ) -> Self { - let nchildren = children.nchildren(); - let cache = (0..nchildren).map(|_| OnceCell::new()).collect(); Self { children, dtypes, @@ -236,14 +255,8 @@ impl LazyReaderChildren { } pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> { - if idx >= self.cache.len() { - vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len()); - } - - self.cache[idx].get_or_try_init(|| { - let dtype = &self.dtypes[idx]; - let child = self.children.child(idx, dtype)?; - child.new_reader( + self.cache.item(idx).get_or_try_init(|| { + self.children.child(idx, &self.dtypes[idx])?.new_reader( Arc::clone(&self.names[idx]), Arc::clone(&self.segment_source), &self.session, @@ -252,3 +265,53 @@ impl LazyReaderChildren { }) } } + +struct SharedReaderCacheInner { + weak: Weak, + cache: LazyReaderChildrenCache, +} + +/// Per segment source reader cache +#[derive(Clone)] +pub struct SharedReaderCache(Arc>>); + +impl SharedReaderCache { + pub fn new() -> Self { + Self(Arc::new(Mutex::new(None))) + } + + /// Return or initialize cached LazyReaderChildrenCache for "segment_source" + pub fn get_or_create( + &self, + segment_source: &Arc, + nchildren: usize, + ) -> LazyReaderChildrenCache { + let mut guard = self.0.lock(); + if let Some(inner) = guard.as_ref() + && let Some(strong) = inner.weak.upgrade() + && Arc::ptr_eq(&strong, &segment_source) + { + inner.cache.clone() + } else { + let weak = Arc::downgrade(segment_source); + let cache = LazyReaderChildrenCache::new(nchildren); + *guard = Some(SharedReaderCacheInner { + weak, + cache: cache.clone(), + }); + cache + } + } +} + +impl Default for SharedReaderCache { + fn default() -> Self { + Self::new() + } +} + +impl Debug for SharedReaderCache { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + f.debug_struct("SharedReaderCache").finish_non_exhaustive() + } +}