Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions vortex-btrblocks/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[
&integer::RunEndScheme,
&integer::SequenceScheme,
&integer::IntRLEScheme,
// Prefer all other schemes above delta, for now (since its slower to decompress).
#[cfg(feature = "unstable_encodings")]
&integer::DeltaScheme,
////////////////////////////////////////////////////////////////////////////////////////////////
// Float schemes.
////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -194,6 +197,10 @@ impl BtrBlocksCompressorBuilder {
];
#[cfg(feature = "unstable_encodings")]
excluded.push(string::OnPairScheme.id());
// Delta has no GPU decode kernel and its prefix-sum decode is inherently sequential, so it
// is incompatible with pure-GPU decompression paths.
#[cfg(feature = "unstable_encodings")]
excluded.push(integer::DeltaScheme.id());
let builder = self.exclude_schemes(excluded);

#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
Expand Down
177 changes: 177 additions & 0 deletions vortex-btrblocks/src/schemes/integer/delta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! FastLanes Delta integer encoding.

use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_compressor::builtins::BinaryDictScheme;
use vortex_compressor::builtins::FloatDictScheme;
use vortex_compressor::builtins::IntDictScheme;
use vortex_compressor::builtins::StringDictScheme;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_compressor::estimate::EstimateScore;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_compressor::scheme::AncestorExclusion;
use vortex_compressor::scheme::ChildSelection;
use vortex_compressor::scheme::DescendantExclusion;
use vortex_error::VortexResult;
use vortex_fastlanes::Delta;

use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::GenerateStatsOptions;
use crate::Scheme;
use crate::SchemeExt;

/// FastLanes Delta encoding for smooth / near-monotone integers.
///
/// Delta replaces each value with its difference from an earlier value (at the FastLanes lane
/// stride), so a later cascade layer (FoR / BitPacking) packs the smaller residuals. It only
/// pays off when those residuals span meaningfully fewer bits than the values themselves.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct DeltaScheme;

/// Multiplicative penalty applied to Delta's estimated compression ratio.
///
/// Unlike FoR/BitPacking, Delta breaks random access and adds a prefix-sum decode pass, and it
/// carries a structural sign bit on its residuals. We therefore require Delta to be meaningfully
/// (~5%) smaller than the best alternative before it wins, rather than picking it for a
/// single-bit gain. This factor encodes that "delta tax".
const DELTA_PENALTY: f64 = 0.95;

/// Minimum length before Delta is worth considering (one FastLanes chunk).
const MIN_DELTA_LEN: usize = 1024;

impl Scheme for DeltaScheme {
fn scheme_name(&self) -> &'static str {
"vortex.int.delta"
}

fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_int()
}

fn num_children(&self) -> usize {
2
}

/// Delta-encode the data at most once per path: exclude Delta from the subtrees of both the
/// bases and the deltas children so we never delta-encode data that was already delta-encoded.
fn descendant_exclusions(&self) -> Vec<DescendantExclusion> {
vec![DescendantExclusion {
excluded: DeltaScheme.id(),
children: ChildSelection::All,
}]
}

/// Delta over dictionary codes just adds indirection: codes are compact integers with no
/// monotone structure, so (like FoR/Sequence) skip the codes child.
fn ancestor_exclusions(&self) -> Vec<AncestorExclusion> {
vec![
AncestorExclusion {
ancestor: IntDictScheme.id(),
children: ChildSelection::One(1),
},
AncestorExclusion {
ancestor: FloatDictScheme.id(),
children: ChildSelection::One(1),
},
AncestorExclusion {
ancestor: StringDictScheme.id(),
children: ChildSelection::One(1),
},
AncestorExclusion {
ancestor: BinaryDictScheme.id(),
children: ChildSelection::One(1),
},
]
}

fn expected_compression_ratio(
&self,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
// Delta only pays off if a later cascade layer (FoR/BitPacking) packs the residuals.
if compress_ctx.finished_cascading() {
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}
// Too short to transpose into FastLanes chunks meaningfully.
if data.array_len() < MIN_DELTA_LEN {
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// Estimating Delta needs the real transposed-delta span, so defer to a callback that
// delta-encodes the array and measures the residual range.
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, data, best_so_far, _ctx, exec_ctx| {
let primitive = data.array().clone().execute::<PrimitiveArray>(exec_ctx)?;
let full_width = primitive.ptype().bit_width() as f64;

// Delta's best case is residuals collapsing to a single bit. If even that, after
// the penalty, can't beat the incumbent, skip before doing the encode work.
let threshold = best_so_far.and_then(EstimateScore::finite_ratio);
if threshold.is_some_and(|t| full_width * DELTA_PENALTY <= t) {
return Ok(EstimateVerdict::Skip);
}

// Measure the actual FastLanes transposed-delta span. This is the lane-stride
// difference that gets bit-packed, not the lag-1 difference (which the transpose
// makes optimistic), so it is what truly drives the compressed size.
let (_bases, deltas) = vortex_fastlanes::delta_compress(&primitive, exec_ctx)?;
let delta_stats =
ArrayAndStats::new(deltas.into_array(), GenerateStatsOptions::default());
let span = delta_stats.integer_stats(exec_ctx).erased().max_minus_min();

// Bits needed to FoR-pack the residuals. A zero span means constant deltas, which
// SequenceScheme already captures more cheaply, so defer to it.
let delta_bits = match span.checked_ilog2() {
Some(l) => (l + 1) as f64,
None => return Ok(EstimateVerdict::Skip),
};

let ratio = full_width / delta_bits * DELTA_PENALTY;
if ratio <= 1.0 {
return Ok(EstimateVerdict::Skip);
}
Ok(EstimateVerdict::Ratio(ratio))
},
)))
}

fn compress(
&self,
compressor: &CascadingCompressor,
data: &ArrayAndStats,
compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let primitive = data.array().clone().execute::<PrimitiveArray>(exec_ctx)?;
let len = primitive.len();
let (bases, deltas) = vortex_fastlanes::delta_compress(&primitive, exec_ctx)?;

let compressed_bases = compressor.compress_child(
&bases.into_array(),
&compress_ctx,
self.id(),
0,
exec_ctx,
)?;
let compressed_deltas = compressor.compress_child(
&deltas.into_array(),
&compress_ctx,
self.id(),
1,
exec_ctx,
)?;

Delta::try_new(compressed_bases, compressed_deltas, 0, len).map(IntoArray::into_array)
}
}
4 changes: 4 additions & 0 deletions vortex-btrblocks/src/schemes/integer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! Integer compression schemes.

mod bitpacking;
#[cfg(feature = "unstable_encodings")]
mod delta;
mod for_;
mod rle;
mod runend;
Expand All @@ -15,6 +17,8 @@ mod zigzag;
mod pco;

pub use bitpacking::BitPackingScheme;
#[cfg(feature = "unstable_encodings")]
pub use delta::DeltaScheme;
pub use for_::FoRScheme;
#[cfg(feature = "pco")]
pub use pco::PcoScheme;
Expand Down
61 changes: 60 additions & 1 deletion vortex-btrblocks/src/schemes/integer/scheme_selection_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ fn test_sequence_compressed() -> VortexResult<()> {
fn test_rle_compressed() -> VortexResult<()> {
let mut values: Vec<i32> = Vec::new();
for i in 0..1024 {
values.extend(iter::repeat_n(i, 10));
// Scramble the per-run value so the data is run-length-dominant but not monotone: this
// keeps RunEnd the winner instead of Delta (whose residuals would be small on a smooth
// ramp).
let v = (i as u32).wrapping_mul(2_654_435_761) as i32;
values.extend(iter::repeat_n(v, 10));
}
let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
let btr = BtrBlocksCompressor::default();
Expand All @@ -152,3 +156,58 @@ fn test_rle_compressed() -> VortexResult<()> {
assert!(compressed.is::<RunEnd>());
Ok(())
}

/// A strictly-increasing column with small, irregular steps: not a perfect arithmetic sequence
/// (so Sequence skips), all-unique with no runs (so RunEnd/Dict skip), and a wide absolute range.
/// Delta's residuals are far smaller than the FoR span, so Delta should win and round-trip, and
/// it must appear at most once in the tree.
#[cfg(feature = "unstable_encodings")]
#[test]
fn test_delta_compressed() -> VortexResult<()> {
use vortex_array::assert_arrays_eq;
use vortex_fastlanes::Delta;

let mut rng = StdRng::seed_from_u64(7u64);
let mut value = 500_000i32;
let values: Vec<i32> = (0..4096)
.map(|_| {
value += 1 + (rng.next_u32() % 6) as i32;
value
})
.collect();
let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);

let btr = BtrBlocksCompressor::default();
let compressed = btr.compress(
&array.clone().into_array(),
&mut SESSION.create_execution_ctx(),
)?;
assert!(
compressed.is::<Delta>(),
"expected Delta, got tree:\n{}",
compressed.display_tree()
);
// Delta must appear at most once per tree: no Delta node may be nested under another.
assert!(
!has_nested_delta(&compressed, false),
"Delta was applied more than once in the tree:\n{}",
compressed.display_tree()
);
assert_arrays_eq!(compressed, array.into_array());
Ok(())
}

/// Returns true if any `Delta` array appears below an ancestor `Delta` in the tree.
#[cfg(feature = "unstable_encodings")]
fn has_nested_delta(array: &vortex_array::ArrayRef, under_delta: bool) -> bool {
use vortex_fastlanes::Delta;

let is_delta = array.is::<Delta>();
if is_delta && under_delta {
return true;
}
array
.children()
.iter()
.any(|child| has_nested_delta(child, under_delta || is_delta))
}
10 changes: 8 additions & 2 deletions vortex-file/src/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,16 @@ mod tests {
// Create a large file (> 1MB)
let mut buf = ByteBufferMut::empty();

// 1.5M integers -> ~6MB. We use a pattern to avoid Sequence encoding.
// 1.5M integers -> ~6MB. We use high-entropy (pseudo-random) values so the data does not
// compress well under any encoding (Sequence, RunEnd, Delta, ...), keeping the written
// file comfortably above 1MB.
let mut state = 0x9E37_79B9u32;
let array = Buffer::from(
(0i32..1_500_000)
.map(|i| if i % 2 == 0 { i } else { -i })
.map(|_| {
state = state.wrapping_mul(1_664_525).wrapping_add(1_013_904_223);
state as i32
})
.collect::<Vec<i32>>(),
)
.into_array();
Expand Down
Loading