Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion vortex-btrblocks/src/schemes/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::decimal::narrowed_decimal;
use vortex_array::dtype::DecimalType;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_decimal_byte_parts::DecimalByteParts;
use vortex_error::VortexResult;

Expand Down Expand Up @@ -47,7 +48,7 @@ impl Scheme for DecimalScheme {
_ctx: CompressorContext,
) -> CompressionEstimate {
// Decimal compression is almost always beneficial (narrowing + primitive compression).
CompressionEstimate::AlwaysUse
CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse)
}

fn compress(
Expand Down
26 changes: 14 additions & 12 deletions vortex-btrblocks/src/schemes/float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::dtype::PType;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_compressor::scheme::ChildSelection;
use vortex_compressor::scheme::DescendantExclusion;
use vortex_error::VortexResult;
Expand Down Expand Up @@ -88,15 +90,15 @@ impl Scheme for ALPScheme {
// ALP encodes floats as integers. Without integer compression afterward, the encoded ints
// are the same size.
if ctx.finished_cascading() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// We don't support ALP for f16.
if data.array_as_primitive().ptype() == PType::F16 {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -154,10 +156,10 @@ impl Scheme for ALPRDScheme {
) -> CompressionEstimate {
// We don't support ALPRD for f16.
if data.array_as_primitive().ptype() == PType::F16 {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -225,16 +227,16 @@ impl Scheme for NullDominatedSparseScheme {

// All-null arrays should be compressed as constant instead anyways.
if value_count == 0 {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// If the majority (90%) of values is null, this will compress well.
if stats.null_count() as f64 / len > 0.9 {
return CompressionEstimate::Ratio(len / value_count as f64);
return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
}

// Otherwise we don't go this route.
CompressionEstimate::Skip
CompressionEstimate::Verdict(EstimateVerdict::Skip)
}

fn compress(
Expand Down Expand Up @@ -279,7 +281,7 @@ impl Scheme for PcoScheme {
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -326,14 +328,14 @@ impl Scheme for FloatRLEScheme {
) -> CompressionEstimate {
// RLE is only useful when we cascade it with another encoding.
if ctx.finished_cascading() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

if data.float_stats().average_run_length() < super::integer::RUN_LENGTH_THRESHOLD {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down
80 changes: 44 additions & 36 deletions vortex-btrblocks/src/schemes/integer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use vortex_array::scalar::Scalar;
use vortex_compressor::builtins::FloatDictScheme;
use vortex_compressor::builtins::StringDictScheme;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_compressor::scheme::AncestorExclusion;
use vortex_compressor::scheme::ChildSelection;
use vortex_compressor::scheme::DescendantExclusion;
Expand Down Expand Up @@ -134,21 +136,21 @@ impl Scheme for FoRScheme {
// FoR only subtracts the min. Without further compression (e.g. BitPacking), the output is
// the same size.
if ctx.finished_cascading() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

let stats = data.integer_stats();

// Only apply when the min is not already zero.
if stats.erased().min_is_zero() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// Difference between max and min.
let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() {
Some(l) => l + 1,
// If max-min == 0, the we should be compressing this as a constant array.
None => return CompressionEstimate::Skip,
None => return CompressionEstimate::Verdict(EstimateVerdict::Skip),
};

// If BitPacking can be applied (only non-negative values) and FoR doesn't reduce bit width
Expand All @@ -162,7 +164,7 @@ impl Scheme for FoRScheme {
{
let bitpack_bitwidth = max_log + 1;
if for_bitwidth >= bitpack_bitwidth {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}
}

Expand All @@ -173,7 +175,9 @@ impl Scheme for FoRScheme {
.try_into()
.vortex_expect("bit width must fit in u32");

CompressionEstimate::Ratio(full_width as f64 / for_bitwidth as f64)
CompressionEstimate::Verdict(EstimateVerdict::Ratio(
full_width as f64 / for_bitwidth as f64,
))
}

fn compress(
Expand Down Expand Up @@ -265,17 +269,17 @@ impl Scheme for ZigZagScheme {
// ZigZag only transforms negative values to positive. Without further compression,
// the output is the same size.
if ctx.finished_cascading() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

let stats = data.integer_stats();

// ZigZag is only useful when there are negative values.
if !stats.erased().min_is_negative() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -314,10 +318,10 @@ impl Scheme for BitPackingScheme {

// BitPacking only works for non-negative values.
if stats.erased().min_is_negative() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -443,12 +447,12 @@ impl Scheme for SparseScheme {

// All-null arrays should be compressed as constant instead anyways.
if value_count == 0 {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// If the majority (90%) of values is null, this will compress well.
if stats.null_count() as f64 / len > 0.9 {
return CompressionEstimate::Ratio(len / value_count as f64);
return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
}

let (_, most_frequent_count) = stats
Expand All @@ -460,18 +464,20 @@ impl Scheme for SparseScheme {

// If the most frequent value is the only value, we should compress as constant instead.
if most_frequent_count == value_count {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}
debug_assert!(value_count > most_frequent_count);

// See if the most frequent value accounts for >= 90% of the set values.
let freq = most_frequent_count as f64 / value_count as f64;
if freq < 0.9 {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// We only store the positions of the non-top values.
CompressionEstimate::Ratio(value_count as f64 / (value_count - most_frequent_count) as f64)
CompressionEstimate::Verdict(EstimateVerdict::Ratio(
value_count as f64 / (value_count - most_frequent_count) as f64,
))
}

fn compress(
Expand Down Expand Up @@ -603,10 +609,10 @@ impl Scheme for RunEndScheme {
) -> CompressionEstimate {
// If the run length is below the threshold, drop it.
if data.integer_stats().average_run_length() < RUN_END_THRESHOLD {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -668,14 +674,14 @@ impl Scheme for SequenceScheme {
// It is pointless checking if a sample is a sequence since it will not correspond to the
// entire array.
if ctx.is_sample() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

let stats = data.integer_stats();

// `SequenceArray` does not support nulls.
if stats.null_count() > 0 {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// If the distinct_values_count was computed, and not all values are unique, then this
Expand All @@ -684,23 +690,25 @@ impl Scheme for SequenceScheme {
.distinct_count()
.is_some_and(|count| count as usize != data.array_len())
{
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// TODO(connor): Why do we sequence encode the whole thing and then throw it away? And then
// why do we divide the ratio by 2???

CompressionEstimate::Estimate(Box::new(|_compressor, data, _ctx| {
let Some(encoded) = sequence_encode(data.array_as_primitive())? else {
// If we are unable to sequence encode this array, make sure we skip.
return Ok(CompressionEstimate::Skip);
};

// TODO(connor): This doesn't really make sense?
// Since two values are required to store base and multiplier the compression ratio is
// divided by 2.
Ok(CompressionEstimate::Ratio(encoded.len() as f64 / 2.0))
}))
CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new(
|_compressor, data, _ctx| {
let Some(encoded) = sequence_encode(data.array_as_primitive())? else {
// If we are unable to sequence encode this array, make sure we skip.
return Ok(EstimateVerdict::Skip);
};

// TODO(connor): This doesn't really make sense?
// Since two values are required to store base and multiplier the compression ratio is
// divided by 2.
Ok(EstimateVerdict::Ratio(encoded.len() as f64 / 2.0))
},
)))
}

fn compress(
Expand Down Expand Up @@ -738,10 +746,10 @@ impl Scheme for PcoScheme {

// Pco does not support I8 or U8.
if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -865,14 +873,14 @@ impl Scheme for IntRLEScheme {
) -> CompressionEstimate {
// RLE is only useful when we cascade it with another encoding.
if ctx.finished_cascading() {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

if data.integer_stats().average_run_length() < RUN_LENGTH_THRESHOLD {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down
14 changes: 8 additions & 6 deletions vortex-btrblocks/src/schemes/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::primitive::PrimitiveArrayExt;
use vortex_array::arrays::varbin::VarBinArrayExt;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_compressor::estimate::EstimateVerdict;
use vortex_compressor::scheme::ChildSelection;
use vortex_compressor::scheme::DescendantExclusion;
use vortex_error::VortexResult;
Expand Down Expand Up @@ -73,7 +75,7 @@ impl Scheme for FSSTScheme {
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -161,16 +163,16 @@ impl Scheme for NullDominatedSparseScheme {

// All-null arrays should be compressed as constant instead anyways.
if value_count == 0 {
return CompressionEstimate::Skip;
return CompressionEstimate::Verdict(EstimateVerdict::Skip);
}

// If the majority (90%) of values is null, this will compress well.
if stats.null_count() as f64 / len > 0.9 {
return CompressionEstimate::Ratio(len / value_count as f64);
return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64));
}

// Otherwise we don't go this route.
CompressionEstimate::Skip
CompressionEstimate::Verdict(EstimateVerdict::Skip)
}

fn compress(
Expand Down Expand Up @@ -216,7 +218,7 @@ impl Scheme for ZstdScheme {
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down Expand Up @@ -245,7 +247,7 @@ impl Scheme for ZstdBuffersScheme {
_data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> CompressionEstimate {
CompressionEstimate::Sample
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
Expand Down
Loading
Loading