diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 762d5e00fb0..caecedcf6f2 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -150,7 +150,7 @@ impl DynAccumulator for Accumulator { // `Combined::try_accumulate` always returns true, so a later kernel check would be // unreachable. { - let kernels_r = kernels.read(); + let kernels_r = kernels.load(); let batch_id = batch.encoding_id(); let kernel = kernels_r .get(&(batch_id, Some(self.aggregate_fn.id()))) @@ -187,7 +187,7 @@ impl DynAccumulator for Accumulator { break; } - let kernels_r = kernels.read(); + let kernels_r = kernels.load(); let batch_id = batch.encoding_id(); let kernel = kernels_r .get(&(batch_id, Some(self.aggregate_fn.id()))) diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index a751a7c5749..cba2b00f46e 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -2,6 +2,8 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use arrow_buffer::ArrowNativeType; +use num_traits::AsPrimitive; +use num_traits::ToPrimitive; use vortex_buffer::Buffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -9,6 +11,7 @@ use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_err; use vortex_error::vortex_panic; +use vortex_mask::AllOr; use vortex_mask::Mask; use crate::AnyCanonical; @@ -26,14 +29,18 @@ use crate::aggregate_fn::session::AggregateFnSessionExt; use crate::arrays::ChunkedArray; use crate::arrays::FixedSizeListArray; use crate::arrays::ListViewArray; +use crate::arrays::Primitive; +use crate::arrays::PrimitiveArray; use crate::arrays::fixed_size_list::FixedSizeListArrayExt; use crate::arrays::listview::ListViewArrayExt; use crate::builders::builder_with_capacity; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::IntegerPType; +use crate::dtype::NativePType; use crate::executor::max_iterations; use crate::match_each_integer_ptype; +use crate::match_each_native_ptype; /// Reference-counted type-erased grouped accumulator. pub type GroupedAccumulatorRef = Box; @@ -170,7 +177,7 @@ impl GroupedAccumulator { break; } - let kernels_r = kernels.read(); + let kernels_r = kernels.load(); if let Some(result) = kernels_r .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) .or_else(|| kernels_r.get(&(elements.encoding_id(), None))) @@ -224,6 +231,32 @@ impl GroupedAccumulator { validity: &Mask, ctx: &mut ExecutionCtx, ) -> VortexResult<()> { + // Fast path: summing a canonical, all-valid primitive column. The generic loop below + // creates a slice array + does a kernel-dispatching scalar `accumulate` + scalar boxing per + // group, which is catastrophic for many small groups. A direct typed per-group slice sum is + // orders of magnitude faster. Falls through for null elements / other aggregates. + if self.aggregate_fn.id().as_str() == "vortex.sum" + && let Some(result) = try_sum_primitive_groups( + elements, + offsets, + sizes, + validity, + &self.partial_dtype, + ctx, + )? + { + return self.push_result(result); + } + + // Fast path: `count` over any element type is just the number of valid elements per group + // (= group size when the elements are all-valid). Avoids the per-group scalar accumulator. + if self.aggregate_fn.id().as_str() == "vortex.count" + && let Some(result) = + try_count_groups(elements, offsets, sizes, validity, &self.partial_dtype, ctx)? + { + return self.push_result(result); + } + let mut accumulator = Accumulator::try_new( self.vtable.clone(), self.options.clone(), @@ -262,7 +295,7 @@ impl GroupedAccumulator { break; } - let kernels_r = kernels.read(); + let kernels_r = kernels.load(); if let Some(result) = kernels_r .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) .or_else(|| kernels_r.get(&(elements.encoding_id(), None))) @@ -332,3 +365,191 @@ impl GroupedAccumulator { Ok(()) } } + +/// Fast vectorized per-group `Sum` over a canonical, all-valid primitive elements array. +/// +/// Returns `Ok(None)` (caller falls back to the generic per-group path) when the elements are not a +/// canonical primitive, contain nulls, or the result dtype would not match the `Sum` partial dtype. +fn try_sum_primitive_groups( + elements: &ArrayRef, + offsets: &[O], + sizes: &[O], + group_validity: &Mask, + partial_dtype: &DType, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + let Some(prim) = elements.as_opt::() else { + return Ok(None); + }; + // Materialize the element validity once. The common case (a nullable column with no actual + // nulls) is `AllOr::All` and takes the tight slice-sum loop; mixed validity falls to a masked + // loop. (`AllOr::None` -> every valid group sums to zero, matching `Sum`.) + let elem_mask = prim.validity()?.execute_mask(prim.len(), ctx)?; + let all_valid = matches!(elem_mask.slices(), AllOr::All); + + let result = match_each_native_ptype!(prim.ptype(), + unsigned: |T| { sum_groups_unsigned::(prim.as_slice::(), offsets, sizes, group_validity, &elem_mask, all_valid) }, + signed: |T| { sum_groups_signed::(prim.as_slice::(), offsets, sizes, group_validity, &elem_mask, all_valid) }, + floating: |T| { sum_groups_float::(prim.as_slice::(), offsets, sizes, group_validity, &elem_mask, all_valid) } + ); + + // Defensive: if our widening doesn't exactly match the aggregate's partial dtype, fall back to + // the generic path rather than emit a mistyped array downstream. + if result.dtype() != partial_dtype { + return Ok(None); + } + Ok(Some(result)) +} + +/// Fast vectorized per-group `count` (number of non-null elements per group). For all-valid +/// elements this is just the group size. Returns `Ok(None)` (caller falls back) when any group is +/// null (the non-nullable count partial can't represent it) or the result dtype mismatches. +fn try_count_groups( + elements: &ArrayRef, + offsets: &[O], + sizes: &[O], + group_validity: &Mask, + partial_dtype: &DType, + ctx: &mut ExecutionCtx, +) -> VortexResult> { + // The count partial dtype is non-nullable, so it cannot represent a null group. + if !matches!(group_validity.slices(), AllOr::All) { + return Ok(None); + } + let elem_mask = elements.validity()?.execute_mask(elements.len(), ctx)?; + let all_valid = matches!(elem_mask.slices(), AllOr::All); + + let counts = offsets.iter().zip(sizes.iter()).map(|(o, sz)| { + let o = o.to_usize().vortex_expect("offset usize"); + let sz = sz.to_usize().vortex_expect("size usize"); + if all_valid { + sz as u64 + } else { + (o..o + sz).filter(|&j| elem_mask.value(j)).count() as u64 + } + }); + let result = PrimitiveArray::from_iter(counts).into_array(); + + if result.dtype() != partial_dtype { + return Ok(None); + } + Ok(Some(result)) +} + +fn sum_groups_unsigned( + values: &[T], + offsets: &[O], + sizes: &[O], + group_validity: &Mask, + elem_mask: &Mask, + all_valid: bool, +) -> ArrayRef +where + T: NativePType + AsPrimitive, + O: IntegerPType, +{ + let iter = offsets + .iter() + .zip(sizes.iter()) + .enumerate() + .map(|(i, (o, sz))| { + if !group_validity.value(i) { + return None; + } + let o = o.to_usize().vortex_expect("offset usize"); + let sz = sz.to_usize().vortex_expect("size usize"); + let mut acc: u64 = 0; + if all_valid { + for &v in &values[o..o + sz] { + acc = acc.checked_add(v.as_())?; // overflow -> null, matching Sum saturation + } + } else { + for j in 0..sz { + if elem_mask.value(o + j) { + acc = acc.checked_add(values[o + j].as_())?; + } + } + } + Some(acc) + }); + PrimitiveArray::from_option_iter(iter).into_array() +} + +fn sum_groups_signed( + values: &[T], + offsets: &[O], + sizes: &[O], + group_validity: &Mask, + elem_mask: &Mask, + all_valid: bool, +) -> ArrayRef +where + T: NativePType + AsPrimitive, + O: IntegerPType, +{ + let iter = offsets + .iter() + .zip(sizes.iter()) + .enumerate() + .map(|(i, (o, sz))| { + if !group_validity.value(i) { + return None; + } + let o = o.to_usize().vortex_expect("offset usize"); + let sz = sz.to_usize().vortex_expect("size usize"); + let mut acc: i64 = 0; + if all_valid { + for &v in &values[o..o + sz] { + acc = acc.checked_add(v.as_())?; // overflow -> null, matching Sum saturation + } + } else { + for j in 0..sz { + if elem_mask.value(o + j) { + acc = acc.checked_add(values[o + j].as_())?; + } + } + } + Some(acc) + }); + PrimitiveArray::from_option_iter(iter).into_array() +} + +fn sum_groups_float( + values: &[T], + offsets: &[O], + sizes: &[O], + group_validity: &Mask, + elem_mask: &Mask, + all_valid: bool, +) -> ArrayRef +where + T: NativePType + ToPrimitive, + O: IntegerPType, +{ + let iter = offsets + .iter() + .zip(sizes.iter()) + .enumerate() + .map(|(i, (o, sz))| { + if !group_validity.value(i) { + return None; + } + let o = o.to_usize().vortex_expect("offset usize"); + let sz = sz.to_usize().vortex_expect("size usize"); + let mut acc: f64 = 0.0; + // NaN propagates, matching Sum's float semantics. + if all_valid { + for &v in &values[o..o + sz] { + acc += v.to_f64().vortex_expect("float to f64"); + } + } else { + for j in 0..sz { + if elem_mask.value(o + j) { + acc += values[o + j].to_f64().vortex_expect("float to f64"); + } + } + } + Some(acc) + }); + PrimitiveArray::from_option_iter(iter).into_array() +} diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index d89f9da069d..2e6805fc0e1 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -4,7 +4,7 @@ use std::any::Any; use std::sync::Arc; -use parking_lot::RwLock; +use arc_swap::ArcSwap; use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::SessionVar; @@ -51,8 +51,11 @@ pub type AggregateFnRegistry = Registry; pub struct AggregateFnSession { registry: AggregateFnRegistry, - pub(super) kernels: RwLock>, - pub(super) grouped_kernels: RwLock>, + // `ArcSwap` rather than `RwLock`: kernels are registered once at session construction and read + // on every accumulate call. Under parallel reduce, a `RwLock` read-lock here was the dominant + // cost (`lock_shared_slow`); `ArcSwap` makes the per-accumulate lookup a lock-free atomic load. + pub(super) kernels: ArcSwap>, + pub(super) grouped_kernels: ArcSwap>, } impl SessionVar for AggregateFnSession { @@ -71,8 +74,8 @@ impl Default for AggregateFnSession { fn default() -> Self { let this = Self { registry: AggregateFnRegistry::default(), - kernels: RwLock::new(HashMap::default()), - grouped_kernels: RwLock::new(HashMap::default()), + kernels: ArcSwap::from_pointee(HashMap::default()), + grouped_kernels: ArcSwap::from_pointee(HashMap::default()), }; // Register the built-in aggregate functions @@ -125,9 +128,12 @@ impl AggregateFnSession { agg_fn_id: Option>, kernel: &'static dyn DynAggregateKernel, ) { - self.kernels - .write() - .insert((array_id.into(), agg_fn_id.map(|id| id.into())), kernel); + let key = (array_id.into(), agg_fn_id.map(|id| id.into())); + self.kernels.rcu(|current| { + let mut new = (**current).clone(); + new.insert(key.clone(), kernel); + new + }); } } diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index d6070ac1a4d..326b460705e 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -15,6 +15,7 @@ use std::env::VarError; use std::fmt; use std::fmt::Display; +use std::sync::Arc; use std::sync::LazyLock; #[cfg(debug_assertions)] use std::sync::atomic::AtomicUsize; @@ -26,7 +27,6 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_panic; -use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::VortexSession; @@ -305,6 +305,12 @@ struct StackFrame { #[derive(Debug, Clone)] pub struct ExecutionCtx { session: VortexSession, + /// Snapshot of the session's [`ArrayKernels`], cached once at construction. + /// + /// The executor consults these per array node; looking them up on the session each time takes a + /// `DashMap` shard read-lock, which becomes a major contention point (`lock_shared_slow`) when + /// many threads execute concurrently. Caching the (cheap) snapshot here avoids that. + kernels: Option>, #[cfg(debug_assertions)] id: usize, #[cfg(debug_assertions)] @@ -314,8 +320,12 @@ pub struct ExecutionCtx { impl ExecutionCtx { /// Create a new execution context with the given session. pub fn new(session: VortexSession) -> Self { + let kernels = session + .get_opt::() + .map(|k| Arc::new((*k).clone())); Self { session, + kernels, #[cfg(debug_assertions)] id: { static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0); @@ -331,6 +341,11 @@ impl ExecutionCtx { &self.session } + /// The cached [`ArrayKernels`] snapshot for this execution, if any were registered. + fn array_kernels(&self) -> Option> { + self.kernels.clone() + } + /// Get the session-scoped host allocator for this execution context. pub fn allocator(&self) -> HostAllocatorRef { self.session.allocator() @@ -424,13 +439,12 @@ impl Executable for ArrayRef { } } - let tmp_session = ctx.session().clone(); - let kernels = tmp_session.get_opt::(); + let kernels = ctx.array_kernels(); for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; if let Some(executed_parent) = - execute_parent_for_child(&array, child, slot_idx, kernels.as_ref(), ctx)? + execute_parent_for_child(&array, child, slot_idx, kernels.as_deref(), ctx)? { ctx.log(format_args!( "execute_parent: slot[{}]({}) rewrote {} -> {}", @@ -542,7 +556,7 @@ fn execute_parent_for_child( parent: &ArrayRef, child: &ArrayRef, slot_idx: usize, - kernels: Option<&Ref>, + kernels: Option<&ArrayKernels>, ctx: &mut ExecutionCtx, ) -> VortexResult> { if let Some(kernels) = kernels @@ -561,13 +575,12 @@ fn execute_parent_for_child( /// Try execute_parent on each occupied slot of the array. fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult> { - let tmp_session = ctx.session().clone(); - let kernels = tmp_session.get_opt::(); + let kernels = ctx.array_kernels(); for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; if let Some(executed_parent) = - execute_parent_for_child(array, child, slot_idx, kernels.as_ref(), ctx)? + execute_parent_for_child(array, child, slot_idx, kernels.as_deref(), ctx)? { ctx.log(format_args!( "execute_parent: slot[{}]({}) rewrote {} -> {}", diff --git a/vortex-array/src/expr/analysis/immediate_access.rs b/vortex-array/src/expr/analysis/immediate_access.rs index 805400be793..f67a827f877 100644 --- a/vortex-array/src/expr/analysis/immediate_access.rs +++ b/vortex-array/src/expr/analysis/immediate_access.rs @@ -2,9 +2,13 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_err; use vortex_utils::aliases::hash_set::HashSet; +use crate::dtype::DType; use crate::dtype::FieldName; +use crate::dtype::FieldPath; use crate::dtype::StructFields; use crate::expr::Expression; use crate::expr::analysis::AnnotationFn; @@ -89,3 +93,60 @@ pub fn immediate_scope_access<'a>( .vortex_expect("Expression missing from scope accesses, this is a internal bug") .clone() } + +/// Returns the rooted field paths referenced by an expression. +/// +/// Unlike [`immediate_scope_access`], this preserves nested field accesses. A standalone root +/// expression is represented by [`FieldPath::root`], which conservatively selects all fields. +pub fn referenced_field_paths( + expr: &Expression, + scope: &DType, +) -> VortexResult> { + let mut field_paths = HashSet::new(); + collect_referenced_field_paths(expr, scope, &mut field_paths)?; + Ok(field_paths) +} + +fn collect_referenced_field_paths( + expr: &Expression, + scope: &DType, + field_paths: &mut HashSet, +) -> VortexResult<()> { + if let Some(selection) = expr.as_opt::