From 6e8bf6557254b1e5b7b6a1c6e87bbe20b7068c77 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Wed, 3 Jun 2026 17:17:18 +0100 Subject: [PATCH 1/4] kernels, other hacks --- vortex-array/src/executor.rs | 29 ++++++--- .../src/expr/analysis/immediate_access.rs | 61 +++++++++++++++++++ vortex-array/src/optimizer/kernels.rs | 10 +++ vortex-layout/src/scan/scan_builder.rs | 16 +++-- vortex/src/lib.rs | 2 + 5 files changed, 101 insertions(+), 17 deletions(-) diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index d6070ac1a4d..326b460705e 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -15,6 +15,7 @@ use std::env::VarError; use std::fmt; use std::fmt::Display; +use std::sync::Arc; use std::sync::LazyLock; #[cfg(debug_assertions)] use std::sync::atomic::AtomicUsize; @@ -26,7 +27,6 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_panic; -use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::VortexSession; @@ -305,6 +305,12 @@ struct StackFrame { #[derive(Debug, Clone)] pub struct ExecutionCtx { session: VortexSession, + /// Snapshot of the session's [`ArrayKernels`], cached once at construction. + /// + /// The executor consults these per array node; looking them up on the session each time takes a + /// `DashMap` shard read-lock, which becomes a major contention point (`lock_shared_slow`) when + /// many threads execute concurrently. Caching the (cheap) snapshot here avoids that. + kernels: Option>, #[cfg(debug_assertions)] id: usize, #[cfg(debug_assertions)] @@ -314,8 +320,12 @@ pub struct ExecutionCtx { impl ExecutionCtx { /// Create a new execution context with the given session. pub fn new(session: VortexSession) -> Self { + let kernels = session + .get_opt::() + .map(|k| Arc::new((*k).clone())); Self { session, + kernels, #[cfg(debug_assertions)] id: { static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0); @@ -331,6 +341,11 @@ impl ExecutionCtx { &self.session } + /// The cached [`ArrayKernels`] snapshot for this execution, if any were registered. + fn array_kernels(&self) -> Option> { + self.kernels.clone() + } + /// Get the session-scoped host allocator for this execution context. pub fn allocator(&self) -> HostAllocatorRef { self.session.allocator() @@ -424,13 +439,12 @@ impl Executable for ArrayRef { } } - let tmp_session = ctx.session().clone(); - let kernels = tmp_session.get_opt::(); + let kernels = ctx.array_kernels(); for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; if let Some(executed_parent) = - execute_parent_for_child(&array, child, slot_idx, kernels.as_ref(), ctx)? + execute_parent_for_child(&array, child, slot_idx, kernels.as_deref(), ctx)? { ctx.log(format_args!( "execute_parent: slot[{}]({}) rewrote {} -> {}", @@ -542,7 +556,7 @@ fn execute_parent_for_child( parent: &ArrayRef, child: &ArrayRef, slot_idx: usize, - kernels: Option<&Ref>, + kernels: Option<&ArrayKernels>, ctx: &mut ExecutionCtx, ) -> VortexResult> { if let Some(kernels) = kernels @@ -561,13 +575,12 @@ fn execute_parent_for_child( /// Try execute_parent on each occupied slot of the array. fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult> { - let tmp_session = ctx.session().clone(); - let kernels = tmp_session.get_opt::(); + let kernels = ctx.array_kernels(); for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; if let Some(executed_parent) = - execute_parent_for_child(array, child, slot_idx, kernels.as_ref(), ctx)? + execute_parent_for_child(array, child, slot_idx, kernels.as_deref(), ctx)? { ctx.log(format_args!( "execute_parent: slot[{}]({}) rewrote {} -> {}", diff --git a/vortex-array/src/expr/analysis/immediate_access.rs b/vortex-array/src/expr/analysis/immediate_access.rs index 805400be793..f67a827f877 100644 --- a/vortex-array/src/expr/analysis/immediate_access.rs +++ b/vortex-array/src/expr/analysis/immediate_access.rs @@ -2,9 +2,13 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_err; use vortex_utils::aliases::hash_set::HashSet; +use crate::dtype::DType; use crate::dtype::FieldName; +use crate::dtype::FieldPath; use crate::dtype::StructFields; use crate::expr::Expression; use crate::expr::analysis::AnnotationFn; @@ -89,3 +93,60 @@ pub fn immediate_scope_access<'a>( .vortex_expect("Expression missing from scope accesses, this is a internal bug") .clone() } + +/// Returns the rooted field paths referenced by an expression. +/// +/// Unlike [`immediate_scope_access`], this preserves nested field accesses. A standalone root +/// expression is represented by [`FieldPath::root`], which conservatively selects all fields. +pub fn referenced_field_paths( + expr: &Expression, + scope: &DType, +) -> VortexResult> { + let mut field_paths = HashSet::new(); + collect_referenced_field_paths(expr, scope, &mut field_paths)?; + Ok(field_paths) +} + +fn collect_referenced_field_paths( + expr: &Expression, + scope: &DType, + field_paths: &mut HashSet, +) -> VortexResult<()> { + if let Some(selection) = expr.as_opt::