Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/bench-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ jobs:
extra_args: "--off-cpu-threshold=0.03" # Personally tuned by @brancz

- name: Run ${{ matrix.benchmark.name }} benchmark
continue-on-error: true
shell: bash
env:
RUST_BACKTRACE: full
FLAT_LAYOUT_INLINE_ARRAY_NODE: true
run: |
bash scripts/bench-taskset.sh target/release_debug/${{ matrix.benchmark.id }} -d gh-json -o results.json
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ jobs:
extra_args: "--off-cpu-threshold=0.03" # Personally tuned by @brancz

- name: Run ${{ matrix.benchmark.name }} benchmark
continue-on-error: true
shell: bash
env:
RUST_BACKTRACE: full
FLAT_LAYOUT_INLINE_ARRAY_NODE: true
run: |
bash scripts/bench-taskset.sh target/release_debug/${{ matrix.benchmark.id }} --formats ${{ matrix.benchmark.formats }} -d gh-json -o results.json

Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/sql-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ jobs:

- name: Run ${{ matrix.name }} benchmark
if: matrix.remote_storage == null || github.event.pull_request.head.repo.fork == true
continue-on-error: true
shell: bash
env:
FLAT_LAYOUT_INLINE_ARRAY_NODE: true
OTEL_SERVICE_NAME: "vortex-bench"
OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf"
OTEL_EXPORTER_OTLP_ENDPOINT: "${{ (inputs.mode != 'pr' || github.event.pull_request.head.repo.fork == false) && secrets.OTEL_EXPORTER_OTLP_ENDPOINT || '' }}"
Expand All @@ -212,8 +214,10 @@ jobs:

- name: Run ${{ matrix.name }} benchmark (remote)
if: matrix.remote_storage != null && (inputs.mode != 'pr' || github.event.pull_request.head.repo.fork == false)
continue-on-error: true
shell: bash
env:
FLAT_LAYOUT_INLINE_ARRAY_NODE: true
AWS_REGION: "us-east-1"
OTEL_SERVICE_NAME: "vortex-bench"
OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf"
Expand Down
4 changes: 4 additions & 0 deletions encodings/bytebool/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ impl vortex_array::arrays::dict::take::TakeExecute for vortex_bytebool::ByteBool

pub fn vortex_bytebool::ByteBool::take(array: vortex_array::array::view::ArrayView<'_, Self>, indices: &vortex_array::array::erased::ArrayRef, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::erased::ArrayRef>>

impl vortex_array::arrays::filter::kernel::FilterReduce for vortex_bytebool::ByteBool

pub fn vortex_bytebool::ByteBool::filter(array: vortex_array::array::view::ArrayView<'_, Self>, mask: &vortex_mask::Mask) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::erased::ArrayRef>>

impl vortex_array::arrays::slice::SliceReduce for vortex_bytebool::ByteBool

pub fn vortex_bytebool::ByteBool::slice(array: vortex_array::array::view::ArrayView<'_, Self>, range: core::ops::range::Range<usize>) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::erased::ArrayRef>>
Expand Down
2 changes: 2 additions & 0 deletions encodings/bytebool/src/rules.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_array::arrays::filter::FilterReduceAdaptor;
use vortex_array::arrays::slice::SliceReduceAdaptor;
use vortex_array::optimizer::rules::ParentRuleSet;
use vortex_array::scalar_fn::fns::cast::CastReduceAdaptor;
Expand All @@ -12,4 +13,5 @@ pub(crate) static RULES: ParentRuleSet<ByteBool> = ParentRuleSet::new(&[
ParentRuleSet::lift(&CastReduceAdaptor(ByteBool)),
ParentRuleSet::lift(&MaskReduceAdaptor(ByteBool)),
ParentRuleSet::lift(&SliceReduceAdaptor(ByteBool)),
ParentRuleSet::lift(&FilterReduceAdaptor(ByteBool)),
]);
17 changes: 17 additions & 0 deletions encodings/bytebool/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use std::ops::Range;
use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::IntoArray;
use vortex_array::arrays::filter::FilterReduce;
use vortex_array::arrays::slice::SliceReduce;
use vortex_error::VortexResult;
use vortex_mask::Mask;

use crate::ByteBool;

Expand All @@ -22,3 +24,18 @@ impl SliceReduce for ByteBool {
))
}
}

impl FilterReduce for ByteBool {
fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult<Option<ArrayRef>> {
if array.buffer().is_on_host() && mask.true_count() * 2 > mask.len() {
return Ok(None);
}
Ok(Some(
ByteBool::new(
array.buffer().filter(mask, size_of::<u8>())?,
array.validity()?.filter(mask)?,
)
.into_array(),
))
}
}
584 changes: 584 additions & 0 deletions vortex-array/public-api.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions vortex-array/src/aggregate_fn/fns/is_sorted/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ pub(super) fn check_primitive_sorted(array: &PrimitiveArray, strict: bool) -> Vo
}

fn compute_is_sorted<T: NativePType>(array: &PrimitiveArray, strict: bool) -> VortexResult<bool> {
let values = array.to_buffer::<T>();
match array.validity_mask()? {
Mask::AllFalse(_) => Ok(!strict),
Mask::AllTrue(_) => {
let slice = array.as_slice::<T>();
let iter = slice.iter().copied().map(NativeValue);
let iter = values.iter().copied().map(NativeValue);

Ok(if strict {
iter.is_strict_sorted()
Expand All @@ -32,7 +32,7 @@ fn compute_is_sorted<T: NativePType>(array: &PrimitiveArray, strict: bool) -> Vo
let iter = mask_values
.bit_buffer()
.iter()
.zip_eq(array.as_slice::<T>())
.zip_eq(values.iter())
.map(|(is_valid, value)| is_valid.then_some(NativeValue(*value)));

Ok(if strict {
Expand Down
27 changes: 27 additions & 0 deletions vortex-array/src/array/erased.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,33 @@ impl ArrayRef {
self.0.execute_parent(self, parent, child_idx, ctx)
}

/// Returns a new array with its children replaced in child-order.
pub fn with_children(&self, children: Vec<ArrayRef>) -> VortexResult<ArrayRef> {
vortex_ensure!(
children.len() == self.nchildren(),
"expected {} children, got {}",
self.nchildren(),
children.len()
);
let mut replacements = children.into_iter();
let mut slots: Vec<Option<ArrayRef>> = self.slots().to_vec();
for slot in &mut slots {
if slot.is_some() {
*slot = Some(
replacements
.next()
.vortex_expect("validated child count must provide a replacement"),
);
}
}
self.clone().with_slots(slots)
}

/// Returns a new array with its buffers replaced.
pub fn with_buffers(&self, buffers: Vec<BufferHandle>) -> VortexResult<ArrayRef> {
self.0.with_buffers(self, buffers)
}

// ArrayVisitor delegation methods

/// Returns the children of the array.
Expand Down
16 changes: 15 additions & 1 deletion vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ pub(crate) trait DynArray: 'static + private::Sealed + Send + Sync + Debug {
/// Returns a new array with the given slots.
fn with_slots(&self, this: ArrayRef, slots: Vec<Option<ArrayRef>>) -> VortexResult<ArrayRef>;

/// Returns a new array with its buffers replaced.
fn with_buffers(&self, this: &ArrayRef, buffers: Vec<BufferHandle>) -> VortexResult<ArrayRef>;

/// Attempt to reduce the array to a simpler representation.
fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>>;

Expand Down Expand Up @@ -280,7 +283,6 @@ impl<V: VTable> DynArray for ArrayInner<V> {
let view = unsafe { ArrayView::new_unchecked(this, &self.data) };
(0..V::nchildren(view)).map(|i| V::child(view, i)).collect()
}

fn nchildren(&self, this: &ArrayRef) -> usize {
let view = unsafe { ArrayView::new_unchecked(this, &self.data) };
V::nchildren(view)
Expand Down Expand Up @@ -392,6 +394,18 @@ impl<V: VTable> DynArray for ArrayInner<V> {
.into_array())
}

fn with_buffers(&self, this: &ArrayRef, buffers: Vec<BufferHandle>) -> VortexResult<ArrayRef> {
let mut data = self.data.clone();
V::with_buffers(&mut data, buffers)?;
let stats = this.statistics().to_owned();
Ok(Array::<V>::try_from_parts(
ArrayParts::new(self.vtable.clone(), this.dtype().clone(), this.len(), data)
.with_slots(this.slots().to_vec()),
)?
.with_stats_set(stats)
.into_array())
}

fn reduce(&self, this: &ArrayRef) -> VortexResult<Option<ArrayRef>> {
let view = unsafe { ArrayView::new_unchecked(this, &self.data) };
let Some(reduced) = V::reduce(view)? else {
Expand Down
15 changes: 15 additions & 0 deletions vortex-array/src/array/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@ pub trait VTable: 'static + Clone + Sized + Send + Sync + Debug {
/// Panics if `idx >= slots(array).len()`.
fn slot_name(array: ArrayView<'_, Self>, idx: usize) -> String;

/// Replaces the slots in `array` with the given `slots` vec.
///
/// Some encodings use this to perform side-effects (e.g. cache invalidation) when
/// slots change. Once those are removed, this will be replaced by `slots_mut`.
fn with_slots(_array: &mut Self::ArrayData, _slots: Vec<Option<ArrayRef>>) -> VortexResult<()> {
Ok(())
}

/// Replaces the buffers in `array` with `buffers`.
///
/// This is the path used when lazy device buffers are materialized into host buffers.
fn with_buffers(_array: &mut Self::ArrayData, _buffers: Vec<BufferHandle>) -> VortexResult<()> {
Ok(())
}

/// Execute this array by returning an [`ExecutionResult`].
///
/// Execution is **iterative**, not recursive. Instead of recursively executing children,
Expand Down
73 changes: 72 additions & 1 deletion vortex-array/src/arrays/bool/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub trait BoolArrayExt: TypedArrayRef<Bool> {
}

fn to_bit_buffer(&self) -> BitBuffer {
let buffer = self.bits.as_host().clone();
let buffer = self.bits.to_host_sync();
BitBuffer::new_with_offset(buffer, self.as_ref().len(), self.offset)
}

Expand Down Expand Up @@ -349,11 +349,19 @@ impl IntoArray for BitBufferMut {

#[cfg(test)]
mod tests {
use std::any::Any;
use std::iter::once;
use std::iter::repeat_n;
use std::ops::Range;
use std::sync::Arc;

use futures::FutureExt;
use futures::future::BoxFuture;
use vortex_buffer::Alignment;
use vortex_buffer::BitBuffer;
use vortex_buffer::BitBufferMut;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_buffer::buffer;

use crate::IntoArray;
Expand All @@ -363,8 +371,56 @@ mod tests {
use crate::arrays::PrimitiveArray;
use crate::arrays::bool::BoolArrayExt;
use crate::assert_arrays_eq;
use crate::buffer::BufferHandle;
use crate::buffer::DeviceBuffer;
use crate::patches::Patches;
use crate::validity::Validity;
use vortex_error::VortexResult;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct TestDeviceBuffer(ByteBuffer);

impl DeviceBuffer for TestDeviceBuffer {
fn as_any(&self) -> &dyn Any {
self
}

fn len(&self) -> usize {
self.0.len()
}

fn alignment(&self) -> Alignment {
self.0.alignment()
}

fn copy_to_host_sync(&self, alignment: Alignment) -> VortexResult<ByteBuffer> {
Ok(self.0.clone().aligned(alignment))
}

fn copy_to_host(
&self,
alignment: Alignment,
) -> VortexResult<BoxFuture<'static, VortexResult<ByteBuffer>>> {
let buffer = self.copy_to_host_sync(alignment)?;
Ok(async move { Ok(buffer) }.boxed())
}

fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
Arc::new(Self(self.0.slice(range)))
}

fn copy_ranges(&self, ranges: &[Range<usize>]) -> VortexResult<Arc<dyn DeviceBuffer>> {
let mut buffer = ByteBufferMut::empty_aligned(self.0.alignment());
for range in ranges {
buffer.extend_from_slice(&self.0[range.clone()]);
}
Ok(Arc::new(Self(buffer.freeze())))
}

fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>> {
Ok(Arc::new(Self(self.0.clone().aligned(alignment))))
}
}

#[test]
fn bool_array() {
Expand Down Expand Up @@ -477,4 +533,19 @@ mod tests {
let sliced = arr.slice(4..15).unwrap();
assert_arrays_eq!(sliced, BoolArray::from_iter([true; 11]));
}

#[test]
fn slice_device_backed_bool_array_materializes_bits() {
let (offset, len, bits) =
BitBuffer::from_iter([true, false, true, false, true, true]).into_inner();
let array = BoolArray::new_handle(
BufferHandle::new_device(Arc::new(TestDeviceBuffer(bits))),
offset,
len,
Validity::NonNullable,
);

let sliced = array.slice(1..5).unwrap();
assert_arrays_eq!(sliced, BoolArray::from_iter([false, true, false, true]));
}
}
26 changes: 26 additions & 0 deletions vortex-array/src/arrays/decimal/compute/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
use std::ops::Range;

use vortex_error::VortexResult;
use vortex_mask::Mask;

use crate::ArrayRef;
use crate::IntoArray;
use crate::array::ArrayView;
use crate::arrays::Decimal;
use crate::arrays::DecimalArray;
use crate::arrays::Masked;
use crate::arrays::filter::FilterReduce;
use crate::arrays::filter::FilterReduceAdaptor;
use crate::arrays::slice::SliceReduce;
use crate::arrays::slice::SliceReduceAdaptor;
use crate::match_each_decimal_value_type;
Expand All @@ -22,6 +25,7 @@ pub(crate) static RULES: ParentRuleSet<Decimal> = ParentRuleSet::new(&[
ParentRuleSet::lift(&DecimalMaskedValidityRule),
ParentRuleSet::lift(&MaskReduceAdaptor(Decimal)),
ParentRuleSet::lift(&SliceReduceAdaptor(Decimal)),
ParentRuleSet::lift(&FilterReduceAdaptor(Decimal)),
]);

/// Rule to push down validity masking from MaskedArray parent into DecimalArray child.
Expand Down Expand Up @@ -71,3 +75,25 @@ impl SliceReduce for Decimal {
Ok(Some(result))
}
}

impl FilterReduce for Decimal {
fn filter(array: ArrayView<'_, Self>, mask: &Mask) -> VortexResult<Option<ArrayRef>> {
if array.buffer_handle().is_on_host() && mask.true_count() * 2 > mask.len() {
return Ok(None);
}
let result = match_each_decimal_value_type!(array.values_type(), |D| {
// SAFETY: Filtering preserves all DecimalArray invariants — values within
// precision bounds remain valid, and we correctly filter the validity.
unsafe {
DecimalArray::new_unchecked_handle(
array.buffer_handle().filter(mask, size_of::<D>())?,
array.values_type(),
array.decimal_dtype(),
array.validity()?.filter(mask)?,
)
}
.into_array()
});
Ok(Some(result))
}
}
2 changes: 2 additions & 0 deletions vortex-array/src/arrays/fixed_size_list/compute/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use crate::arrays::FixedSizeList;
use crate::arrays::filter::FilterReduceAdaptor;
use crate::arrays::slice::SliceReduceAdaptor;
use crate::optimizer::rules::ParentRuleSet;
use crate::scalar_fn::fns::cast::CastReduceAdaptor;
Expand All @@ -11,4 +12,5 @@ pub(crate) const PARENT_RULES: ParentRuleSet<FixedSizeList> = ParentRuleSet::new
ParentRuleSet::lift(&CastReduceAdaptor(FixedSizeList)),
ParentRuleSet::lift(&MaskReduceAdaptor(FixedSizeList)),
ParentRuleSet::lift(&SliceReduceAdaptor(FixedSizeList)),
ParentRuleSet::lift(&FilterReduceAdaptor(FixedSizeList)),
]);
Loading
Loading