From 4ddf9e3c02cec3aaf9abbf3729b2c6f6e3d5f1b4 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Mon, 13 Apr 2026 13:09:43 -0400 Subject: [PATCH] more robust types in the compressor Signed-off-by: Connor Tsui --- vortex-btrblocks/src/schemes/decimal.rs | 3 +- vortex-btrblocks/src/schemes/float.rs | 26 +- vortex-btrblocks/src/schemes/integer.rs | 80 +++-- vortex-btrblocks/src/schemes/string.rs | 14 +- vortex-btrblocks/src/schemes/temporal.rs | 3 +- vortex-compressor/public-api.lock | 32 +- .../src/builtins/constant/bool.rs | 9 +- .../src/builtins/constant/float.rs | 26 +- .../src/builtins/constant/integer.rs | 13 +- .../src/builtins/constant/string.rs | 24 +- vortex-compressor/src/builtins/dict/float.rs | 8 +- .../src/builtins/dict/integer.rs | 9 +- vortex-compressor/src/builtins/dict/string.rs | 8 +- vortex-compressor/src/compressor.rs | 332 ++++++++++++++++-- vortex-compressor/src/estimate.rs | 56 +-- vortex-compressor/src/scheme.rs | 21 +- .../src/encodings/turboquant/scheme.rs | 5 +- 17 files changed, 494 insertions(+), 175 deletions(-) diff --git a/vortex-btrblocks/src/schemes/decimal.rs b/vortex-btrblocks/src/schemes/decimal.rs index ce5262ba5ce..3b9def06633 100644 --- a/vortex-btrblocks/src/schemes/decimal.rs +++ b/vortex-btrblocks/src/schemes/decimal.rs @@ -11,6 +11,7 @@ use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::decimal::narrowed_decimal; use vortex_array::dtype::DecimalType; use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::EstimateVerdict; use vortex_decimal_byte_parts::DecimalByteParts; use vortex_error::VortexResult; @@ -47,7 +48,7 @@ impl Scheme for DecimalScheme { _ctx: CompressorContext, ) -> CompressionEstimate { // Decimal compression is almost always beneficial (narrowing + primitive compression). - CompressionEstimate::AlwaysUse + CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) } fn compress( diff --git a/vortex-btrblocks/src/schemes/float.rs b/vortex-btrblocks/src/schemes/float.rs index 0af2b30974e..3d2557c63c7 100644 --- a/vortex-btrblocks/src/schemes/float.rs +++ b/vortex-btrblocks/src/schemes/float.rs @@ -21,6 +21,8 @@ use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::dtype::PType; use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::estimate::EstimateVerdict; use vortex_compressor::scheme::ChildSelection; use vortex_compressor::scheme::DescendantExclusion; use vortex_error::VortexResult; @@ -88,15 +90,15 @@ impl Scheme for ALPScheme { // ALP encodes floats as integers. Without integer compression afterward, the encoded ints // are the same size. if ctx.finished_cascading() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // We don't support ALP for f16. if data.array_as_primitive().ptype() == PType::F16 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -154,10 +156,10 @@ impl Scheme for ALPRDScheme { ) -> CompressionEstimate { // We don't support ALPRD for f16. if data.array_as_primitive().ptype() == PType::F16 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -225,16 +227,16 @@ impl Scheme for NullDominatedSparseScheme { // All-null arrays should be compressed as constant instead anyways. if value_count == 0 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // If the majority (90%) of values is null, this will compress well. if stats.null_count() as f64 / len > 0.9 { - return CompressionEstimate::Ratio(len / value_count as f64); + return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64)); } // Otherwise we don't go this route. - CompressionEstimate::Skip + CompressionEstimate::Verdict(EstimateVerdict::Skip) } fn compress( @@ -279,7 +281,7 @@ impl Scheme for PcoScheme { _data: &mut ArrayAndStats, _ctx: CompressorContext, ) -> CompressionEstimate { - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -326,14 +328,14 @@ impl Scheme for FloatRLEScheme { ) -> CompressionEstimate { // RLE is only useful when we cascade it with another encoding. if ctx.finished_cascading() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } if data.float_stats().average_run_length() < super::integer::RUN_LENGTH_THRESHOLD { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( diff --git a/vortex-btrblocks/src/schemes/integer.rs b/vortex-btrblocks/src/schemes/integer.rs index 100ebdfe68c..47fc52225aa 100644 --- a/vortex-btrblocks/src/schemes/integer.rs +++ b/vortex-btrblocks/src/schemes/integer.rs @@ -17,6 +17,8 @@ use vortex_array::scalar::Scalar; use vortex_compressor::builtins::FloatDictScheme; use vortex_compressor::builtins::StringDictScheme; use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::estimate::EstimateVerdict; use vortex_compressor::scheme::AncestorExclusion; use vortex_compressor::scheme::ChildSelection; use vortex_compressor::scheme::DescendantExclusion; @@ -134,21 +136,21 @@ impl Scheme for FoRScheme { // FoR only subtracts the min. Without further compression (e.g. BitPacking), the output is // the same size. if ctx.finished_cascading() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let stats = data.integer_stats(); // Only apply when the min is not already zero. if stats.erased().min_is_zero() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // Difference between max and min. let for_bitwidth = match stats.erased().max_minus_min().checked_ilog2() { Some(l) => l + 1, // If max-min == 0, the we should be compressing this as a constant array. - None => return CompressionEstimate::Skip, + None => return CompressionEstimate::Verdict(EstimateVerdict::Skip), }; // If BitPacking can be applied (only non-negative values) and FoR doesn't reduce bit width @@ -162,7 +164,7 @@ impl Scheme for FoRScheme { { let bitpack_bitwidth = max_log + 1; if for_bitwidth >= bitpack_bitwidth { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } } @@ -173,7 +175,9 @@ impl Scheme for FoRScheme { .try_into() .vortex_expect("bit width must fit in u32"); - CompressionEstimate::Ratio(full_width as f64 / for_bitwidth as f64) + CompressionEstimate::Verdict(EstimateVerdict::Ratio( + full_width as f64 / for_bitwidth as f64, + )) } fn compress( @@ -265,17 +269,17 @@ impl Scheme for ZigZagScheme { // ZigZag only transforms negative values to positive. Without further compression, // the output is the same size. if ctx.finished_cascading() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let stats = data.integer_stats(); // ZigZag is only useful when there are negative values. if !stats.erased().min_is_negative() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -314,10 +318,10 @@ impl Scheme for BitPackingScheme { // BitPacking only works for non-negative values. if stats.erased().min_is_negative() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -443,12 +447,12 @@ impl Scheme for SparseScheme { // All-null arrays should be compressed as constant instead anyways. if value_count == 0 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // If the majority (90%) of values is null, this will compress well. if stats.null_count() as f64 / len > 0.9 { - return CompressionEstimate::Ratio(len / value_count as f64); + return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64)); } let (_, most_frequent_count) = stats @@ -460,18 +464,20 @@ impl Scheme for SparseScheme { // If the most frequent value is the only value, we should compress as constant instead. if most_frequent_count == value_count { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } debug_assert!(value_count > most_frequent_count); // See if the most frequent value accounts for >= 90% of the set values. let freq = most_frequent_count as f64 / value_count as f64; if freq < 0.9 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // We only store the positions of the non-top values. - CompressionEstimate::Ratio(value_count as f64 / (value_count - most_frequent_count) as f64) + CompressionEstimate::Verdict(EstimateVerdict::Ratio( + value_count as f64 / (value_count - most_frequent_count) as f64, + )) } fn compress( @@ -603,10 +609,10 @@ impl Scheme for RunEndScheme { ) -> CompressionEstimate { // If the run length is below the threshold, drop it. if data.integer_stats().average_run_length() < RUN_END_THRESHOLD { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -668,14 +674,14 @@ impl Scheme for SequenceScheme { // It is pointless checking if a sample is a sequence since it will not correspond to the // entire array. if ctx.is_sample() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let stats = data.integer_stats(); // `SequenceArray` does not support nulls. if stats.null_count() > 0 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // If the distinct_values_count was computed, and not all values are unique, then this @@ -684,23 +690,25 @@ impl Scheme for SequenceScheme { .distinct_count() .is_some_and(|count| count as usize != data.array_len()) { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // TODO(connor): Why do we sequence encode the whole thing and then throw it away? And then // why do we divide the ratio by 2??? - CompressionEstimate::Estimate(Box::new(|_compressor, data, _ctx| { - let Some(encoded) = sequence_encode(data.array_as_primitive())? else { - // If we are unable to sequence encode this array, make sure we skip. - return Ok(CompressionEstimate::Skip); - }; - - // TODO(connor): This doesn't really make sense? - // Since two values are required to store base and multiplier the compression ratio is - // divided by 2. - Ok(CompressionEstimate::Ratio(encoded.len() as f64 / 2.0)) - })) + CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new( + |_compressor, data, _ctx| { + let Some(encoded) = sequence_encode(data.array_as_primitive())? else { + // If we are unable to sequence encode this array, make sure we skip. + return Ok(EstimateVerdict::Skip); + }; + + // TODO(connor): This doesn't really make sense? + // Since two values are required to store base and multiplier the compression ratio is + // divided by 2. + Ok(EstimateVerdict::Ratio(encoded.len() as f64 / 2.0)) + }, + ))) } fn compress( @@ -738,10 +746,10 @@ impl Scheme for PcoScheme { // Pco does not support I8 or U8. if matches!(data.array_as_primitive().ptype(), PType::I8 | PType::U8) { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -865,14 +873,14 @@ impl Scheme for IntRLEScheme { ) -> CompressionEstimate { // RLE is only useful when we cascade it with another encoding. if ctx.finished_cascading() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } if data.integer_stats().average_run_length() < RUN_LENGTH_THRESHOLD { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( diff --git a/vortex-btrblocks/src/schemes/string.rs b/vortex-btrblocks/src/schemes/string.rs index 331fad9f8cb..5b21d6c1d59 100644 --- a/vortex-btrblocks/src/schemes/string.rs +++ b/vortex-btrblocks/src/schemes/string.rs @@ -11,6 +11,8 @@ use vortex_array::arrays::VarBinArray; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::arrays::varbin::VarBinArrayExt; use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::estimate::EstimateVerdict; use vortex_compressor::scheme::ChildSelection; use vortex_compressor::scheme::DescendantExclusion; use vortex_error::VortexResult; @@ -73,7 +75,7 @@ impl Scheme for FSSTScheme { _data: &mut ArrayAndStats, _ctx: CompressorContext, ) -> CompressionEstimate { - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -161,16 +163,16 @@ impl Scheme for NullDominatedSparseScheme { // All-null arrays should be compressed as constant instead anyways. if value_count == 0 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // If the majority (90%) of values is null, this will compress well. if stats.null_count() as f64 / len > 0.9 { - return CompressionEstimate::Ratio(len / value_count as f64); + return CompressionEstimate::Verdict(EstimateVerdict::Ratio(len / value_count as f64)); } // Otherwise we don't go this route. - CompressionEstimate::Skip + CompressionEstimate::Verdict(EstimateVerdict::Skip) } fn compress( @@ -216,7 +218,7 @@ impl Scheme for ZstdScheme { _data: &mut ArrayAndStats, _ctx: CompressorContext, ) -> CompressionEstimate { - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( @@ -245,7 +247,7 @@ impl Scheme for ZstdBuffersScheme { _data: &mut ArrayAndStats, _ctx: CompressorContext, ) -> CompressionEstimate { - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( diff --git a/vortex-btrblocks/src/schemes/temporal.rs b/vortex-btrblocks/src/schemes/temporal.rs index 306ccaf982d..47d9ae81cfb 100644 --- a/vortex-btrblocks/src/schemes/temporal.rs +++ b/vortex-btrblocks/src/schemes/temporal.rs @@ -15,6 +15,7 @@ use vortex_array::dtype::extension::Matcher; use vortex_array::extension::datetime::AnyTemporal; use vortex_array::extension::datetime::TemporalMetadata; use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::EstimateVerdict; use vortex_datetime_parts::DateTimeParts; use vortex_datetime_parts::TemporalParts; use vortex_datetime_parts::split_temporal; @@ -62,7 +63,7 @@ impl Scheme for TemporalScheme { _ctx: CompressorContext, ) -> CompressionEstimate { // Temporal compression (splitting into parts) is almost always beneficial. - CompressionEstimate::AlwaysUse + CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) } fn compress( diff --git a/vortex-compressor/public-api.lock b/vortex-compressor/public-api.lock index 694f3d676f0..5d9957c2ca7 100644 --- a/vortex-compressor/public-api.lock +++ b/vortex-compressor/public-api.lock @@ -308,21 +308,37 @@ pub mod vortex_compressor::estimate pub enum vortex_compressor::estimate::CompressionEstimate -pub vortex_compressor::estimate::CompressionEstimate::AlwaysUse +pub vortex_compressor::estimate::CompressionEstimate::Deferred(vortex_compressor::estimate::DeferredEstimate) -pub vortex_compressor::estimate::CompressionEstimate::Estimate(alloc::boxed::Box) +pub vortex_compressor::estimate::CompressionEstimate::Verdict(vortex_compressor::estimate::EstimateVerdict) -pub vortex_compressor::estimate::CompressionEstimate::Ratio(f64) +impl core::fmt::Debug for vortex_compressor::estimate::CompressionEstimate -pub vortex_compressor::estimate::CompressionEstimate::Sample +pub fn vortex_compressor::estimate::CompressionEstimate::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result -pub vortex_compressor::estimate::CompressionEstimate::Skip +pub enum vortex_compressor::estimate::DeferredEstimate -impl core::fmt::Debug for vortex_compressor::estimate::CompressionEstimate +pub vortex_compressor::estimate::DeferredEstimate::Callback(alloc::boxed::Box) -pub fn vortex_compressor::estimate::CompressionEstimate::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +pub vortex_compressor::estimate::DeferredEstimate::Sample + +impl core::fmt::Debug for vortex_compressor::estimate::DeferredEstimate + +pub fn vortex_compressor::estimate::DeferredEstimate::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub enum vortex_compressor::estimate::EstimateVerdict + +pub vortex_compressor::estimate::EstimateVerdict::AlwaysUse + +pub vortex_compressor::estimate::EstimateVerdict::Ratio(f64) + +pub vortex_compressor::estimate::EstimateVerdict::Skip + +impl core::fmt::Debug for vortex_compressor::estimate::EstimateVerdict + +pub fn vortex_compressor::estimate::EstimateVerdict::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result -pub type vortex_compressor::estimate::EstimateFn = (dyn core::ops::function::FnOnce(&vortex_compressor::CascadingCompressor, &mut vortex_compressor::stats::ArrayAndStats, vortex_compressor::ctx::CompressorContext) -> vortex_error::VortexResult + core::marker::Send + core::marker::Sync) +pub type vortex_compressor::estimate::EstimateFn = (dyn core::ops::function::FnOnce(&vortex_compressor::CascadingCompressor, &mut vortex_compressor::stats::ArrayAndStats, vortex_compressor::ctx::CompressorContext) -> vortex_error::VortexResult + core::marker::Send + core::marker::Sync) pub mod vortex_compressor::scheme diff --git a/vortex-compressor/src/builtins/constant/bool.rs b/vortex-compressor/src/builtins/constant/bool.rs index 62e156379e9..335bd29c2ba 100644 --- a/vortex-compressor/src/builtins/constant/bool.rs +++ b/vortex-compressor/src/builtins/constant/bool.rs @@ -12,6 +12,7 @@ use crate::builtins::BoolConstantScheme; use crate::builtins::constant::compress_constant_array_with_validity; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; +use crate::estimate::EstimateVerdict; use crate::scheme::Scheme; use crate::stats::ArrayAndStats; @@ -32,7 +33,7 @@ impl Scheme for BoolConstantScheme { // Constant detection on a sample is a false positive, since the sample being constant does // not mean the full array is constant. if ctx.is_sample() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let array_len = data.array().len(); @@ -41,14 +42,14 @@ impl Scheme for BoolConstantScheme { // We want to use `Constant` if there are only nulls in the array. if stats.value_count() == 0 { debug_assert_eq!(stats.null_count() as usize, array_len); - return CompressionEstimate::AlwaysUse; + return CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse); } if stats.is_constant() { - return CompressionEstimate::AlwaysUse; + return CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse); } - CompressionEstimate::Skip + CompressionEstimate::Verdict(EstimateVerdict::Skip) } fn compress( diff --git a/vortex-compressor/src/builtins/constant/float.rs b/vortex-compressor/src/builtins/constant/float.rs index df8ab7464b6..6cee2014334 100644 --- a/vortex-compressor/src/builtins/constant/float.rs +++ b/vortex-compressor/src/builtins/constant/float.rs @@ -14,6 +14,8 @@ use crate::builtins::FloatConstantScheme; use crate::builtins::constant::compress_constant_array_with_validity; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; +use crate::estimate::DeferredEstimate; +use crate::estimate::EstimateVerdict; use crate::scheme::Scheme; use crate::stats::ArrayAndStats; @@ -34,7 +36,7 @@ impl Scheme for FloatConstantScheme { // Constant detection on a sample is a false positive, since the sample being constant does // not mean the full array is constant. if ctx.is_sample() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let array_len = data.array().len(); @@ -43,17 +45,17 @@ impl Scheme for FloatConstantScheme { // Note that we only compute distinct counts if other schemes have requested it. if let Some(distinct_count) = stats.distinct_count() { if distinct_count > 1 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } else { debug_assert_eq!(distinct_count, 1); - return CompressionEstimate::AlwaysUse; + return CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse); } } // We want to use `Constant` if there are only nulls in the array. if stats.value_count() == 0 { debug_assert_eq!(stats.null_count() as usize, array_len); - return CompressionEstimate::AlwaysUse; + return CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse); } // TODO(connor): Can we be smart here with the max and min like with integers? @@ -61,13 +63,15 @@ impl Scheme for FloatConstantScheme { // Otherwise our best bet is to actually check if the array is constant. // This is an expensive check, but in practice the distinct count is known because we often // include dictionary encoding in our set of schemes, so we rarely call this. - CompressionEstimate::Estimate(Box::new(|compressor, data, _ctx| { - if is_constant(data.array(), &mut compressor.execution_ctx())? { - Ok(CompressionEstimate::AlwaysUse) - } else { - Ok(CompressionEstimate::Skip) - } - })) + CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new( + |compressor, data, _ctx| { + if is_constant(data.array(), &mut compressor.execution_ctx())? { + Ok(EstimateVerdict::AlwaysUse) + } else { + Ok(EstimateVerdict::Skip) + } + }, + ))) } fn compress( diff --git a/vortex-compressor/src/builtins/constant/integer.rs b/vortex-compressor/src/builtins/constant/integer.rs index 0264893e5c8..1062664ca69 100644 --- a/vortex-compressor/src/builtins/constant/integer.rs +++ b/vortex-compressor/src/builtins/constant/integer.rs @@ -13,6 +13,7 @@ use crate::builtins::IntConstantScheme; use crate::builtins::constant::compress_constant_array_with_validity; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; +use crate::estimate::EstimateVerdict; use crate::scheme::Scheme; use crate::stats::ArrayAndStats; @@ -33,7 +34,7 @@ impl Scheme for IntConstantScheme { // Constant detection on a sample is a false positive, since the sample being constant does // not mean the full array is constant. if ctx.is_sample() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let array_len = data.array().len(); @@ -42,24 +43,24 @@ impl Scheme for IntConstantScheme { // Note that we only compute distinct counts if other schemes have requested it. if let Some(distinct_count) = stats.distinct_count() { if distinct_count > 1 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } else { debug_assert_eq!(distinct_count, 1); - return CompressionEstimate::AlwaysUse; + return CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse); } } // We want to use `Constant` if there are only nulls in the array. if stats.value_count() == 0 { debug_assert_eq!(stats.null_count() as usize, array_len); - return CompressionEstimate::AlwaysUse; + return CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse); } // Otherwise, use the max and min to determine if there is a single value. match stats.erased().max_minus_min().checked_ilog2() { - Some(_) => CompressionEstimate::Skip, + Some(_) => CompressionEstimate::Verdict(EstimateVerdict::Skip), // If max-min == 0, then we know that there is only 1 value. - None => CompressionEstimate::AlwaysUse, + None => CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse), } } diff --git a/vortex-compressor/src/builtins/constant/string.rs b/vortex-compressor/src/builtins/constant/string.rs index 96e4e7ba674..694eae16872 100644 --- a/vortex-compressor/src/builtins/constant/string.rs +++ b/vortex-compressor/src/builtins/constant/string.rs @@ -14,6 +14,8 @@ use crate::builtins::StringConstantScheme; use crate::builtins::constant::compress_constant_array_with_validity; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; +use crate::estimate::DeferredEstimate; +use crate::estimate::EstimateVerdict; use crate::scheme::Scheme; use crate::stats::ArrayAndStats; @@ -34,7 +36,7 @@ impl Scheme for StringConstantScheme { // Constant detection on a sample is a false positive, since the sample being constant does // not mean the full array is constant. if ctx.is_sample() { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let array_len = data.array().len(); @@ -43,25 +45,27 @@ impl Scheme for StringConstantScheme { // We want to use `Constant` if there are only nulls in the array. if stats.value_count() == 0 { debug_assert_eq!(stats.null_count() as usize, array_len); - return CompressionEstimate::AlwaysUse; + return CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse); } // Since the estimated distinct count is always going to be less than or equal to the actual // distinct count, if this is not equal to 1 the actual is definitely not equal to 1. if stats.estimated_distinct_count().is_some_and(|c| c > 1) { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // Otherwise our best bet is to actually check if the array is constant. // This is an expensive check, but the alternative of not compressing a constant array is // far less preferable. - CompressionEstimate::Estimate(Box::new(|compressor, data, _ctx| { - if is_constant(data.array(), &mut compressor.execution_ctx())? { - Ok(CompressionEstimate::AlwaysUse) - } else { - Ok(CompressionEstimate::Skip) - } - })) + CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new( + |compressor, data, _ctx| { + if is_constant(data.array(), &mut compressor.execution_ctx())? { + Ok(EstimateVerdict::AlwaysUse) + } else { + Ok(EstimateVerdict::Skip) + } + }, + ))) } fn compress( diff --git a/vortex-compressor/src/builtins/dict/float.rs b/vortex-compressor/src/builtins/dict/float.rs index 6a188108612..18d5c216aa9 100644 --- a/vortex-compressor/src/builtins/dict/float.rs +++ b/vortex-compressor/src/builtins/dict/float.rs @@ -28,6 +28,8 @@ use crate::builtins::IntDictScheme; use crate::builtins::is_float_primitive; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; +use crate::estimate::DeferredEstimate; +use crate::estimate::EstimateVerdict; use crate::scheme::ChildSelection; use crate::scheme::DescendantExclusion; use crate::scheme::Scheme; @@ -86,7 +88,7 @@ impl Scheme for FloatDictScheme { let stats = data.float_stats(); if stats.value_count() == 0 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let distinct_values_count = stats.distinct_count().vortex_expect( @@ -95,11 +97,11 @@ impl Scheme for FloatDictScheme { // If > 50% of the values are distinct, skip dictionary scheme. if distinct_values_count > stats.value_count() / 2 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // Let sampling determine the expected ratio. - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( diff --git a/vortex-compressor/src/builtins/dict/integer.rs b/vortex-compressor/src/builtins/dict/integer.rs index d956a90990a..0693612aaee 100644 --- a/vortex-compressor/src/builtins/dict/integer.rs +++ b/vortex-compressor/src/builtins/dict/integer.rs @@ -26,6 +26,7 @@ use crate::builtins::IntDictScheme; use crate::builtins::is_integer_primitive; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; +use crate::estimate::EstimateVerdict; use crate::scheme::Scheme; use crate::scheme::SchemeExt; use crate::stats::ArrayAndStats; @@ -62,7 +63,7 @@ impl Scheme for IntDictScheme { let stats = data.integer_stats(); if stats.value_count() == 0 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let distinct_values_count = stats.distinct_count().vortex_expect( @@ -71,7 +72,7 @@ impl Scheme for IntDictScheme { // If > 50% of the values are distinct, skip dictionary scheme. if distinct_values_count > stats.value_count() / 2 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // Ignore nulls encoding for the estimate. We only focus on values. @@ -92,7 +93,9 @@ impl Scheme for IntDictScheme { let before = stats.value_count() as usize * bit_width; - CompressionEstimate::Ratio(before as f64 / (values_size + codes_size) as f64) + CompressionEstimate::Verdict(EstimateVerdict::Ratio( + before as f64 / (values_size + codes_size) as f64, + )) } fn compress( diff --git a/vortex-compressor/src/builtins/dict/string.rs b/vortex-compressor/src/builtins/dict/string.rs index 399910a00ff..efc5511fc29 100644 --- a/vortex-compressor/src/builtins/dict/string.rs +++ b/vortex-compressor/src/builtins/dict/string.rs @@ -24,6 +24,8 @@ use crate::builtins::StringDictScheme; use crate::builtins::is_utf8_string; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; +use crate::estimate::DeferredEstimate; +use crate::estimate::EstimateVerdict; use crate::scheme::ChildSelection; use crate::scheme::DescendantExclusion; use crate::scheme::Scheme; @@ -71,7 +73,7 @@ impl Scheme for StringDictScheme { let stats = data.string_stats(); if stats.value_count() == 0 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } let estimated_distinct_values_count = stats.estimated_distinct_count().vortex_expect( @@ -80,11 +82,11 @@ impl Scheme for StringDictScheme { // If > 50% of the values are distinct, skip dictionary scheme. if estimated_distinct_values_count > stats.value_count() / 2 { - return CompressionEstimate::Skip; + return CompressionEstimate::Verdict(EstimateVerdict::Skip); } // Let sampling determine the expected ratio. - CompressionEstimate::Sample + CompressionEstimate::Deferred(DeferredEstimate::Sample) } fn compress( diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index 099cc858f92..d1d7cc28958 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -33,11 +33,12 @@ use vortex_array::dtype::Nullability; use vortex_array::scalar::Scalar; use vortex_error::VortexResult; use vortex_error::vortex_bail; -use vortex_error::vortex_panic; use crate::builtins::IntDictScheme; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; +use crate::estimate::DeferredEstimate; +use crate::estimate::EstimateVerdict; use crate::estimate::estimate_compression_ratio_with_sampling; use crate::estimate::is_better_ratio; use crate::scheme::ChildSelection; @@ -63,6 +64,15 @@ mod root_list_children { pub const SIZES: usize = 2; } +/// The winning estimate for a scheme after all deferred work has been resolved. +#[derive(Debug, Clone, Copy, PartialEq)] +enum WinnerEstimate { + /// The scheme must be used immediately. + AlwaysUse, + /// The scheme won by numeric compression ratio. + Ratio(f64), +} + /// The main compressor type implementing cascading adaptive compression. /// /// This compressor applies adaptive compression [`Scheme`]s to arrays based on their data types and @@ -72,7 +82,7 @@ mod root_list_children { /// The compressor works by: /// 1. Canonicalizing input arrays to a standard representation. /// 2. Pre-filtering schemes by [`Scheme::matches`] and exclusion rules. -/// 3. Evaluating each matching scheme's compression ratio on a sample. +/// 3. Evaluating each matching scheme's compression estimate and resolving deferred work. /// 4. Compressing with the best scheme and verifying the result is smaller. /// /// No scheme may appear twice in a cascade chain. The compressor enforces this automatically @@ -311,13 +321,16 @@ impl CascadingCompressor { let mut data = ArrayAndStats::new(array, merged_opts); - if let Some(winner) = self.choose_best_scheme(&eligible_schemes, &mut data, ctx.clone())? { - // TODO(connor): Add a tracing warning here if compression with the chosen scheme - // failed, since there was likely more we could have done while choosing schemes. - + // TODO(connor): Add tracing support for logging the winner estimate. + if let Some((winner, _winner_estimate)) = + self.choose_best_scheme(&eligible_schemes, &mut data, ctx.clone())? + { // Sampling and estimation chose a scheme, so let's compress the whole array with it. let compressed = winner.compress(self, &mut data, ctx)?; + // TODO(connor): Add a tracing warning here if compression with the chosen scheme + // failed, since there was likely more we could have done while choosing schemes. + // Only choose the compressed array if it is smaller than the canonical one. if compressed.nbytes() < before_nbytes { // TODO(connor): Add a tracing warning here too. @@ -329,9 +342,9 @@ impl CascadingCompressor { Ok(data.into_array()) } - /// Calls [`expected_compression_ratio`] on each candidate and returns the scheme with the - /// highest ratio, or `None` if no scheme exceeds 1.0. Ties are broken by registration order - /// (earlier in the list wins). + /// Calls [`expected_compression_ratio`] on each candidate and returns the winning scheme and + /// resolved winner estimate, or `None` if no scheme exceeds 1.0. Ties are broken by + /// registration order (earlier in the list wins). /// /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio fn choose_best_scheme( @@ -339,7 +352,7 @@ impl CascadingCompressor { schemes: &[&'static dyn Scheme], data: &mut ArrayAndStats, ctx: CompressorContext, - ) -> VortexResult> { + ) -> VortexResult> { let mut best: Option<(&'static dyn Scheme, f64)> = None; // TODO(connor): Might want to use an `im` data structure inside of `ctx` if the clones here @@ -347,15 +360,17 @@ impl CascadingCompressor { for &scheme in schemes { let estimate = scheme.expected_compression_ratio(data, ctx.clone()); + // TODO(connor): Rather than computing the deferred estimates eagerly, it would be + // better to look at all quick estimates and see if it makes sense to sample at all. match estimate { - CompressionEstimate::Skip => {} - CompressionEstimate::AlwaysUse => return Ok(Some(scheme)), - CompressionEstimate::Ratio(ratio) => { - if is_better_ratio(ratio, &best) { - best = Some((scheme, ratio)); + CompressionEstimate::Verdict(verdict) => { + if let Some(winner_estimate) = + Self::check_and_update_estimate_verdict(&mut best, scheme, verdict) + { + return Ok(Some((scheme, winner_estimate))); } } - CompressionEstimate::Sample => { + CompressionEstimate::Deferred(DeferredEstimate::Sample) => { let sample_ratio = estimate_compression_ratio_with_sampling( scheme, self, @@ -367,31 +382,36 @@ impl CascadingCompressor { best = Some((scheme, sample_ratio)); } } - // TODO(connor): Is there a way to deduplicate some of this code? - CompressionEstimate::Estimate(estimate_callback) => { - let estimate = estimate_callback(self, data, ctx.clone())?; - - match estimate { - CompressionEstimate::Skip => {} - CompressionEstimate::AlwaysUse => return Ok(Some(scheme)), - CompressionEstimate::Ratio(ratio) => { - if is_better_ratio(ratio, &best) { - best = Some((scheme, ratio)); - } - } - e @ (CompressionEstimate::Sample | CompressionEstimate::Estimate(_)) => { - vortex_panic!( - "an estimation function returned an invalid variant {e:?}" - ) - } + CompressionEstimate::Deferred(DeferredEstimate::Callback(estimate_callback)) => { + let verdict = estimate_callback(self, data, ctx.clone())?; + if let Some(winner_estimate) = + Self::check_and_update_estimate_verdict(&mut best, scheme, verdict) + { + return Ok(Some((scheme, winner_estimate))); } } } - - // tracing::debug!(scheme = %scheme.id(), estimate, "evaluated compression ratio"); } - Ok(best.map(|(s, _)| s)) + Ok(best.map(|(scheme, ratio)| (scheme, WinnerEstimate::Ratio(ratio)))) + } + + /// Updates `best` from a terminal estimate verdict. + fn check_and_update_estimate_verdict( + best: &mut Option<(&'static dyn Scheme, f64)>, + scheme: &'static dyn Scheme, + verdict: EstimateVerdict, + ) -> Option { + match verdict { + EstimateVerdict::Skip => None, + EstimateVerdict::AlwaysUse => Some(WinnerEstimate::AlwaysUse), + EstimateVerdict::Ratio(ratio) => { + if is_better_ratio(ratio, &*best) { + *best = Some((scheme, ratio)); + } + None + } + } } // TODO(connor): Lots of room for optimization here. @@ -502,6 +522,8 @@ impl CascadingCompressor { #[cfg(test)] mod tests { + use vortex_array::ArrayRef; + use vortex_array::Canonical; use vortex_array::arrays::BoolArray; use vortex_array::arrays::Constant; use vortex_array::arrays::PrimitiveArray; @@ -513,12 +535,180 @@ mod tests { use crate::builtins::IntDictScheme; use crate::builtins::StringDictScheme; use crate::ctx::CompressorContext; + use crate::estimate::CompressionEstimate; + use crate::estimate::DeferredEstimate; + use crate::estimate::EstimateVerdict; use crate::scheme::SchemeExt; fn compressor() -> CascadingCompressor { CascadingCompressor::new(vec![&IntDictScheme, &FloatDictScheme, &StringDictScheme]) } + fn estimate_test_data() -> ArrayAndStats { + let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4], Validity::NonNullable).into_array(); + ArrayAndStats::new(array, GenerateStatsOptions::default()) + } + + fn matches_integer_primitive(canonical: &Canonical) -> bool { + matches!(canonical, Canonical::Primitive(primitive) if primitive.ptype().is_int()) + } + + #[derive(Debug)] + struct DirectRatioScheme; + + impl Scheme for DirectRatioScheme { + fn scheme_name(&self) -> &'static str { + "test.direct_ratio" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches_integer_primitive(canonical) + } + + fn expected_compression_ratio( + &self, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> CompressionEstimate { + CompressionEstimate::Verdict(EstimateVerdict::Ratio(2.0)) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> VortexResult { + unreachable!("test helper should never be selected for compression") + } + } + + #[derive(Debug)] + struct ImmediateAlwaysUseScheme; + + impl Scheme for ImmediateAlwaysUseScheme { + fn scheme_name(&self) -> &'static str { + "test.immediate_always_use" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches_integer_primitive(canonical) + } + + fn expected_compression_ratio( + &self, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> CompressionEstimate { + CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> VortexResult { + unreachable!("test helper should never be selected for compression") + } + } + + #[derive(Debug)] + struct CallbackAlwaysUseScheme; + + impl Scheme for CallbackAlwaysUseScheme { + fn scheme_name(&self) -> &'static str { + "test.callback_always_use" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches_integer_primitive(canonical) + } + + fn expected_compression_ratio( + &self, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new( + |_compressor, _data, _ctx| Ok(EstimateVerdict::AlwaysUse), + ))) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> VortexResult { + unreachable!("test helper should never be selected for compression") + } + } + + #[derive(Debug)] + struct CallbackSkipScheme; + + impl Scheme for CallbackSkipScheme { + fn scheme_name(&self) -> &'static str { + "test.callback_skip" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches_integer_primitive(canonical) + } + + fn expected_compression_ratio( + &self, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new( + |_compressor, _data, _ctx| Ok(EstimateVerdict::Skip), + ))) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> VortexResult { + unreachable!("test helper should never be selected for compression") + } + } + + #[derive(Debug)] + struct CallbackRatioScheme; + + impl Scheme for CallbackRatioScheme { + fn scheme_name(&self) -> &'static str { + "test.callback_ratio" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches_integer_primitive(canonical) + } + + fn expected_compression_ratio( + &self, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Callback(Box::new( + |_compressor, _data, _ctx| Ok(EstimateVerdict::Ratio(3.0)), + ))) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> VortexResult { + unreachable!("test helper should never be selected for compression") + } + } + #[test] fn test_self_exclusion() { let c = compressor(); @@ -568,6 +758,76 @@ mod tests { assert!(!c.is_excluded(&IntDictScheme, &ctx)); } + #[test] + fn immediate_always_use_wins_immediately() -> VortexResult<()> { + let compressor = + CascadingCompressor::new(vec![&DirectRatioScheme, &ImmediateAlwaysUseScheme]); + let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &ImmediateAlwaysUseScheme]; + let mut data = estimate_test_data(); + + let winner = + compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?; + + assert!(matches!( + winner, + Some((scheme, WinnerEstimate::AlwaysUse)) + if scheme.id() == ImmediateAlwaysUseScheme.id() + )); + Ok(()) + } + + #[test] + fn callback_always_use_wins_immediately() -> VortexResult<()> { + let compressor = + CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackAlwaysUseScheme]); + let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackAlwaysUseScheme]; + let mut data = estimate_test_data(); + + let winner = + compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?; + + assert!(matches!( + winner, + Some((scheme, WinnerEstimate::AlwaysUse)) + if scheme.id() == CallbackAlwaysUseScheme.id() + )); + Ok(()) + } + + #[test] + fn callback_skip_is_ignored() -> VortexResult<()> { + let compressor = CascadingCompressor::new(vec![&CallbackSkipScheme, &DirectRatioScheme]); + let schemes: [&'static dyn Scheme; 2] = [&CallbackSkipScheme, &DirectRatioScheme]; + let mut data = estimate_test_data(); + + let winner = + compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?; + + assert!(matches!( + winner, + Some((scheme, WinnerEstimate::Ratio(2.0))) + if scheme.id() == DirectRatioScheme.id() + )); + Ok(()) + } + + #[test] + fn callback_ratio_competes_numerically() -> VortexResult<()> { + let compressor = CascadingCompressor::new(vec![&DirectRatioScheme, &CallbackRatioScheme]); + let schemes: [&'static dyn Scheme; 2] = [&DirectRatioScheme, &CallbackRatioScheme]; + let mut data = estimate_test_data(); + + let winner = + compressor.choose_best_scheme(&schemes, &mut data, CompressorContext::new())?; + + assert!(matches!( + winner, + Some((scheme, WinnerEstimate::Ratio(3.0))) + if scheme.id() == CallbackRatioScheme.id() + )); + Ok(()) + } + #[test] fn all_null_array_compresses_to_constant() -> VortexResult<()> { let array = PrimitiveArray::new( diff --git a/vortex-compressor/src/estimate.rs b/vortex-compressor/src/estimate.rs index b1e3ae3c659..957b54e57a7 100644 --- a/vortex-compressor/src/estimate.rs +++ b/vortex-compressor/src/estimate.rs @@ -18,35 +18,43 @@ use crate::sample::sample_count_approx_one_percent; use crate::scheme::Scheme; use crate::stats::ArrayAndStats; -/// Closure type for [`CompressionEstimate::Estimate`]. The compressor calls this with the same -/// arguments it would pass to sampling. +/// Closure type for [`DeferredEstimate::Callback`]. +/// +/// The compressor calls this with the same arguments it would pass to sampling. The closure must +/// resolve directly to a terminal [`EstimateVerdict`]. #[rustfmt::skip] pub type EstimateFn = dyn FnOnce( &CascadingCompressor, &mut ArrayAndStats, CompressorContext, - ) -> VortexResult + ) -> VortexResult + Send + Sync; -// TODO(connor): We should make use of the fact that some checks are cheap and some checks are -// expensive (sample or estimate variants). /// The result of a [`Scheme`]'s compression ratio estimation. /// /// This type is returned by [`Scheme::expected_compression_ratio`] to tell the compressor how /// promising this scheme is for a given array without performing any expensive work. /// -/// All expensive or fallible operations (sampling, trial encoding) are deferred to the compressor -/// via the [`Sample`](CompressionEstimate::Sample) and [`Estimate`](CompressionEstimate::Estimate) -/// variants. -/// -/// [`Sample`]: CompressionEstimate::Sample -/// [`Estimate`]: CompressionEstimate::Estimate +/// [`CompressionEstimate::Verdict`] means the scheme already knows the terminal answer. +/// [`CompressionEstimate::Deferred`] means the compressor must do extra work before the scheme can +/// produce a terminal answer. +#[derive(Debug)] pub enum CompressionEstimate { + /// The scheme already knows the terminal estimation verdict. + Verdict(EstimateVerdict), + + /// The compressor must perform deferred work to resolve the terminal estimation verdict. + Deferred(DeferredEstimate), +} + +/// The terminal answer to a compression estimate request. +#[derive(Debug)] +pub enum EstimateVerdict { /// Do not use this scheme for this array. Skip, - /// Always use this scheme, as we know it is definitively the best choice. + /// Always use this scheme, as it is definitively the best choice. /// /// Some examples include constant detection, decimal byte parts, and temporal decomposition. /// @@ -60,21 +68,20 @@ pub enum CompressionEstimate { /// The estimated compression ratio. This must be greater than `1.0` to be considered by the /// compressor, otherwise it is worse than the canonical encoding. Ratio(f64), +} +/// Deferred work that can resolve to a terminal [`EstimateVerdict`]. +pub enum DeferredEstimate { /// The scheme cannot cheaply estimate its ratio, so the compressor should compress a small /// sample to determine effectiveness. Sample, - /// A fallible estimation requiring a custom expensive computation. The compressor will call the - /// closure and handle the result. + /// A fallible estimation requiring a custom expensive computation. /// /// Use this only when the scheme needs to perform trial encoding or other costly checks to - /// determine its compression ratio. - /// - /// The estimation function must **not** return a [`Sample`](CompressionEstimate::Sample) or - /// [`Estimate`](CompressionEstimate::Estimate) variant to ensure the estimation process is - /// bounded. - Estimate(Box), + /// determine its compression ratio. The callback returns an [`EstimateVerdict`] directly, so + /// it cannot request more sampling or another deferred callback. + Callback(Box), } /// Returns `true` if `ratio` is a valid compression ratio (> 1.0, finite, not subnormal) that @@ -138,14 +145,11 @@ pub(super) fn estimate_compression_ratio_with_sampling( Ok(ratio) } -impl fmt::Debug for CompressionEstimate { +impl fmt::Debug for DeferredEstimate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - CompressionEstimate::Skip => write!(f, "Skip"), - CompressionEstimate::AlwaysUse => write!(f, "AlwaysUse"), - CompressionEstimate::Ratio(r) => f.debug_tuple("Ratio").field(r).finish(), - CompressionEstimate::Sample => write!(f, "Sample"), - CompressionEstimate::Estimate(_) => write!(f, "Estimate(..)"), + DeferredEstimate::Sample => write!(f, "Sample"), + DeferredEstimate::Callback(_) => write!(f, "Callback(..)"), } } } diff --git a/vortex-compressor/src/scheme.rs b/vortex-compressor/src/scheme.rs index 66b35051e95..c3f1760d373 100644 --- a/vortex-compressor/src/scheme.rs +++ b/vortex-compressor/src/scheme.rs @@ -129,10 +129,10 @@ pub struct AncestorExclusion { /// /// # Implementing a scheme /// -/// [`expected_compression_ratio`] should return [`CompressionEstimate::Sample`] when a cheap -/// heuristic is not available, asking the compressor to estimate via sampling. Implementors should -/// return a more specific variant when possible (e.g. [`CompressionEstimate::AlwaysUse`] for -/// constant detection or [`CompressionEstimate::Skip`] for early rejection based on stats). +/// [`expected_compression_ratio`] should return +/// `CompressionEstimate::Deferred(DeferredEstimate::Sample)` when a cheap heuristic is not +/// available, asking the compressor to estimate via sampling. Implementors should return an +/// immediate [`CompressionEstimate::Verdict`] when possible. /// /// Schemes that need statistics that may be expensive to compute should override [`stats_options`] /// to declare what they require. The compressor merges all eligible schemes' options before @@ -184,14 +184,21 @@ pub trait Scheme: Debug + Send + Sync { /// Cheaply estimate the compression ratio for this scheme on the given array. /// - /// This method should be fast and infallible. Any expensive or fallible work should be deferred - /// to the compressor by returning [`CompressionEstimate::Sample`] or - /// [`CompressionEstimate::Estimate`]. + /// This method should be fast and infallible. Any expensive or fallible work should be + /// deferred to the compressor by returning + /// `CompressionEstimate::Deferred(DeferredEstimate::Sample)` or + /// `CompressionEstimate::Deferred(DeferredEstimate::Callback(...))`. /// /// The compressor will ask all schemes what their expected compression ratio is given the array /// and statistics. The scheme with the highest estimated ratio will then be applied to the /// entire array. /// + /// [`CompressionEstimate::Verdict`] means the scheme already knows the terminal + /// [`crate::estimate::EstimateVerdict`]. `CompressionEstimate::Deferred(DeferredEstimate::Sample)` + /// asks the compressor to sample. `CompressionEstimate::Deferred(DeferredEstimate::Callback(...))` + /// asks the compressor to run custom deferred work. Deferred callbacks must return a + /// [`crate::estimate::EstimateVerdict`] directly, never another deferred request. + /// /// Note that the compressor will also use this method when compressing samples, so some /// statistics that might hold for the samples may not hold for the entire array (e.g., /// `Constant`). Implementations should check `ctx.is_sample` to make sure that they are diff --git a/vortex-tensor/src/encodings/turboquant/scheme.rs b/vortex-tensor/src/encodings/turboquant/scheme.rs index ba9b95dd1e0..b603f12e16e 100644 --- a/vortex-tensor/src/encodings/turboquant/scheme.rs +++ b/vortex-tensor/src/encodings/turboquant/scheme.rs @@ -29,6 +29,7 @@ use vortex_array::arrays::scalar_fn::ScalarFnArrayExt; use vortex_compressor::CascadingCompressor; use vortex_compressor::ctx::CompressorContext; use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::EstimateVerdict; use vortex_compressor::scheme::Scheme; use vortex_compressor::stats::ArrayAndStats; use vortex_error::VortexExpect; @@ -91,11 +92,11 @@ impl Scheme for TurboQuantScheme { .vortex_expect("invalid bit width for TurboQuant"); let dimension = vector_metadata.dimensions(); - CompressionEstimate::Ratio(estimate_compression_ratio( + CompressionEstimate::Verdict(EstimateVerdict::Ratio(estimate_compression_ratio( element_bit_width, dimension, len, - )) + ))) } fn compress(