Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions vortex-layout/src/layouts/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::LayoutId;
use crate::LayoutReaderContext;
use crate::LayoutReaderRef;
use crate::LayoutRef;
use crate::SharedReaderCache;
use crate::VTable;
use crate::children::LayoutChildren;
use crate::children::OwnedLayoutChildren;
Expand Down Expand Up @@ -77,12 +78,16 @@ impl VTable for Chunked {
session: &VortexSession,
ctx: &LayoutReaderContext,
) -> VortexResult<LayoutReaderRef> {
let reader_cache = layout
.cache
.get_or_create(&segment_source, layout.nchildren());
Ok(Arc::new(ChunkedReader::new(
layout.clone(),
name,
segment_source,
session,
ctx.clone(),
reader_cache,
)))
}

Expand Down Expand Up @@ -113,6 +118,7 @@ impl VTable for Chunked {

layout.children = new_children;
layout.chunk_offsets = chunk_offsets;
layout.cache = SharedReaderCache::new();
Ok(())
}
}
Expand All @@ -130,6 +136,7 @@ pub struct ChunkedLayout {
dtype: DType,
children: Arc<dyn LayoutChildren>,
chunk_offsets: Vec<u64>,
cache: SharedReaderCache,
}

impl ChunkedLayout {
Expand All @@ -149,6 +156,7 @@ impl ChunkedLayout {
dtype,
children,
chunk_offsets,
cache: SharedReaderCache::new(),
}
}

Expand Down
3 changes: 3 additions & 0 deletions vortex-layout/src/layouts/chunked/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use vortex_session::VortexSession;
use crate::LayoutReaderContext;
use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::LazyReaderChildrenCache;
use crate::layouts::chunked::ChunkedLayout;
use crate::reader::LayoutReader;
use crate::reader::RowSplits;
Expand All @@ -51,6 +52,7 @@ impl ChunkedReader {
segment_source: Arc<dyn SegmentSource>,
session: &VortexSession,
ctx: LayoutReaderContext,
reader_cache: LazyReaderChildrenCache,
) -> Self {
let nchildren = layout.nchildren();
let dtypes = vec![layout.dtype.clone(); nchildren];
Expand All @@ -72,6 +74,7 @@ impl ChunkedReader {
segment_source,
session.clone(),
ctx,
reader_cache,
);

Self {
Expand Down
3 changes: 3 additions & 0 deletions vortex-layout/src/layouts/struct_/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::ArrayFuture;
use crate::LayoutReader;
use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::LazyReaderChildrenCache;
use crate::RowSplits;
use crate::SplitRange;
use crate::layouts::partitioned::PartitionedExprEval;
Expand Down Expand Up @@ -94,13 +95,15 @@ impl StructReader {
names.insert(0, Arc::from("validity"));
}

let cache = LazyReaderChildrenCache::new(layout.nchildren());
let lazy_children = LazyReaderChildren::new(
Arc::clone(&layout.children),
dtypes,
names,
Arc::clone(&segment_source),
session.clone(),
ctx,
cache,
);

// Create an expanded root expression that contains all fields of the struct.
Expand Down
3 changes: 3 additions & 0 deletions vortex-layout/src/layouts/zoned/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use vortex_session::VortexSession;
use crate::LayoutReader;
use crate::LayoutReaderRef;
use crate::LazyReaderChildren;
use crate::LazyReaderChildrenCache;
use crate::RowSplits;
use crate::SplitRange;
use crate::layouts::zoned::ZonedLayout;
Expand Down Expand Up @@ -49,13 +50,15 @@ impl ZonedReader {
stats_table_dtype(layout.dtype(), layout.present_stats()),
];
let names = vec![Arc::clone(&name), format!("{}.zones", name).into()];
let cache = LazyReaderChildrenCache::new(layout.nchildren());
let lazy_children = Arc::new(LazyReaderChildren::new(
Arc::clone(&layout.children),
dtypes,
names,
Arc::clone(&segment_source),
session.clone(),
ctx,
cache,
));

Ok(Self {
Expand Down
87 changes: 75 additions & 12 deletions vortex-layout/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::any::Any;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Result;
use std::ops::Range;
use std::sync::Arc;
use std::sync::Weak;

use futures::future::BoxFuture;
use futures::try_join;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use vortex_array::ArrayRef;
use vortex_array::IntoArray;
use vortex_array::MaskFuture;
Expand Down Expand Up @@ -202,28 +207,42 @@ impl ArrayFutureExt for ArrayFuture {
}
}

/// Per child layout reader cache
#[derive(Clone)]
pub struct LazyReaderChildrenCache(Arc<[OnceCell<LayoutReaderRef>]>);

impl LazyReaderChildrenCache {
pub fn new(len: usize) -> Self {
Self(vec![OnceCell::new(); len].into_boxed_slice().into())
}

pub fn item(&self, idx: usize) -> &OnceCell<LayoutReaderRef> {
&self.0[idx]
}
}

pub struct LazyReaderChildren {
children: Arc<dyn LayoutChildren>,
dtypes: Vec<DType>,
names: Vec<Arc<str>>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
ctx: LayoutReaderContext,
// TODO(ngates): we may want a hash map of some sort here?
cache: Vec<OnceCell<LayoutReaderRef>>,
cache: LazyReaderChildrenCache,
}

impl LazyReaderChildren {
/// If cache is supplied, caller must ensure it was created with same
/// "nchildren" and "segment_source"
pub fn new(
children: Arc<dyn LayoutChildren>,
dtypes: Vec<DType>,
names: Vec<Arc<str>>,
segment_source: Arc<dyn SegmentSource>,
session: VortexSession,
ctx: LayoutReaderContext,
cache: LazyReaderChildrenCache,
) -> Self {
let nchildren = children.nchildren();
let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
Self {
children,
dtypes,
Expand All @@ -236,14 +255,8 @@ impl LazyReaderChildren {
}

pub fn get(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
if idx >= self.cache.len() {
vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
}

self.cache[idx].get_or_try_init(|| {
let dtype = &self.dtypes[idx];
let child = self.children.child(idx, dtype)?;
child.new_reader(
self.cache.item(idx).get_or_try_init(|| {
self.children.child(idx, &self.dtypes[idx])?.new_reader(
Arc::clone(&self.names[idx]),
Arc::clone(&self.segment_source),
&self.session,
Expand All @@ -252,3 +265,53 @@ impl LazyReaderChildren {
})
}
}

struct SharedReaderCacheInner {
weak: Weak<dyn SegmentSource>,
cache: LazyReaderChildrenCache,
}

/// Per segment source reader cache
#[derive(Clone)]
pub struct SharedReaderCache(Arc<Mutex<Option<SharedReaderCacheInner>>>);

impl SharedReaderCache {
pub fn new() -> Self {
Self(Arc::new(Mutex::new(None)))
}

/// Return or initialize cached LazyReaderChildrenCache for "segment_source"
pub fn get_or_create(
&self,
segment_source: &Arc<dyn SegmentSource>,
nchildren: usize,
) -> LazyReaderChildrenCache {
let mut guard = self.0.lock();
if let Some(inner) = guard.as_ref()
&& let Some(strong) = inner.weak.upgrade()
&& Arc::ptr_eq(&strong, &segment_source)
{
inner.cache.clone()
} else {
let weak = Arc::downgrade(segment_source);
let cache = LazyReaderChildrenCache::new(nchildren);
*guard = Some(SharedReaderCacheInner {
weak,
cache: cache.clone(),
});
cache
}
}
}

impl Default for SharedReaderCache {
fn default() -> Self {
Self::new()
}
}

impl Debug for SharedReaderCache {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
f.debug_struct("SharedReaderCache").finish_non_exhaustive()
}
}
Loading