From 7569b52d428091ac9e06c96dc6cfca9298d1e447 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 10 Apr 2026 03:01:00 +0000 Subject: [PATCH 1/4] feat(compressor): add structured tracing instrumentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instrument the cascading compressor with composable `tracing` spans and events so users can see what the compressor is doing, compare estimated and actual compression ratios, time individual phases, and surface previously-silent "compressed but the output grew" decisions. Four targets let users select one aspect at a time via `RUST_LOG`: - `vortex_compressor::cascade` — top-level + `compress_child` spans - `vortex_compressor::select` — scheme eligibility, evaluation, winner, and short-circuit reasons - `vortex_compressor::estimate` — sampling span and sample.collected / sample.result events - `vortex_compressor::encode` — per-scheme encode span and the scheme.compress_result event with estimated vs actual ratio + accepted Spans are at `trace` level so `tracing-perfetto` / `tracing-timing` / `tracing-opentelemetry` only materialize them on demand. Events are at `debug` for outcomes so `RUST_LOG=vortex_compressor::encode=debug` produces one readable summary line per leaf. New `tests/tracing.rs` uses a custom capture layer (not `TestWriter`) to pin the names and stable fields of the emitted events so downstream observability tooling does not break under rename. Instrumentation lives entirely in the orchestration layer (compressor.rs + estimate.rs); individual scheme implementations are untouched. The existing unstructured calls in estimate.rs and the stale commented-out line in compressor.rs are removed. A new `# Observability` section in the crate docs carries the full target / span / event reference with `RUST_LOG` recipes. Signed-off-by: Claude --- Cargo.lock | 1 + vortex-compressor/Cargo.toml | 3 +- vortex-compressor/src/compressor.rs | 261 ++++++++++++++++++++++++++-- vortex-compressor/src/estimate.rs | 72 ++++++-- vortex-compressor/src/lib.rs | 101 +++++++++++ vortex-compressor/tests/tracing.rs | 258 +++++++++++++++++++++++++++ 6 files changed, 663 insertions(+), 33 deletions(-) create mode 100644 vortex-compressor/tests/tracing.rs diff --git a/Cargo.lock b/Cargo.lock index 52f583dd2cc..52563e15a42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10323,6 +10323,7 @@ dependencies = [ "rstest", "rustc-hash", "tracing", + "tracing-subscriber", "vortex-array", "vortex-buffer", "vortex-error", diff --git a/vortex-compressor/Cargo.toml b/vortex-compressor/Cargo.toml index da9bd07889c..2d94679157a 100644 --- a/vortex-compressor/Cargo.toml +++ b/vortex-compressor/Cargo.toml @@ -19,7 +19,7 @@ num-traits = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } rustc-hash = { workspace = true } -tracing = { workspace = true } +tracing = { workspace = true, features = ["std", "attributes"] } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-error = { workspace = true } @@ -29,6 +29,7 @@ vortex-utils = { workspace = true } [dev-dependencies] divan = { workspace = true } rstest = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } vortex-array = { workspace = true, features = ["_test-harness"] } [lints] diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index d1d7cc28958..22a14aaf88e 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -56,6 +56,55 @@ const ROOT_SCHEME_ID: SchemeId = SchemeId { name: "vortex.compressor.root", }; +/// Tracing target for scheme selection events (eligibility, evaluation, winner, short-circuits). +/// +/// See the crate-level `Observability` section of [`crate`] for the full target taxonomy. +const TARGET_SELECT: &str = "vortex_compressor::select"; + +/// Tracing target for per-scheme encoding events (the `scheme.compress` span and the +/// `scheme.compress_result` event reporting estimated vs actual compression ratios). +const TARGET_ENCODE: &str = "vortex_compressor::encode"; + +/// Tracing target for cascade-tree events (top-level `compress` and `compress_child` spans, +/// cascade-exhausted short-circuits). +const TARGET_CASCADE: &str = "vortex_compressor::cascade"; + +/// Emits a structured `scheme.evaluated` trace event on [`TARGET_SELECT`] for one scheme's +/// initial estimation verdict. +/// +/// For `Ratio(r)` the numeric estimate is recorded directly. For `Sample` and `Estimate` +/// the ratio is not yet known at this point; a follow-up `scheme.evaluated.resolved` event +/// is emitted by the caller after the deferred computation finishes. +/// +/// Defined as a standalone helper (rather than inlined) because the `match` expression that +/// extracts `kind` and the optional `ratio` field is the only repetition worth factoring out +/// of [`CascadingCompressor::choose_best_scheme`]. +fn emit_scheme_evaluated(scheme: &'static dyn Scheme, estimate: &CompressionEstimate) { + let kind: &'static str = match estimate { + CompressionEstimate::Verdict(EstimateVerdict::Skip) => "Skip", + CompressionEstimate::Verdict(EstimateVerdict::AlwaysUse) => "AlwaysUse", + CompressionEstimate::Verdict(EstimateVerdict::Ratio(_)) => "Ratio", + CompressionEstimate::Deferred(DeferredEstimate::Sample) => "Sample", + CompressionEstimate::Deferred(DeferredEstimate::Callback(_)) => "Estimate", + }; + if let CompressionEstimate::Verdict(EstimateVerdict::Ratio(ratio)) = estimate { + tracing::trace!( + target: TARGET_SELECT, + scheme = %scheme.id(), + kind, + ratio = *ratio, + "scheme.evaluated", + ); + } else { + tracing::trace!( + target: TARGET_SELECT, + scheme = %scheme.id(), + kind, + "scheme.evaluated", + ); + } +} + /// Child indices for the compressor's list/listview compression. mod root_list_children { /// List/ListView offsets child. @@ -134,6 +183,17 @@ impl CascadingCompressor { /// # Errors /// /// Returns an error if canonicalization or compression fails. + #[tracing::instrument( + target = "vortex_compressor::cascade", + name = "CascadingCompressor::compress", + level = "trace", + skip_all, + fields( + len = array.len(), + nbytes = array.nbytes(), + dtype = %array.dtype(), + ), + )] pub fn compress(&self, array: &ArrayRef) -> VortexResult { let canonical = array .clone() @@ -162,7 +222,24 @@ impl CascadingCompressor { parent_id: SchemeId, child_index: usize, ) -> VortexResult { + let _span = tracing::trace_span!( + target: TARGET_CASCADE, + "compress_child", + parent = %parent_id, + child_index, + cascade_depth = parent_ctx.cascade_history().len(), + len = child.len(), + ) + .entered(); + if parent_ctx.finished_cascading() { + tracing::debug!( + target: TARGET_CASCADE, + reason = "cascade_exhausted", + parent = %parent_id, + child_index, + "short_circuit", + ); return Ok(child.clone()); } @@ -281,11 +358,28 @@ impl CascadingCompressor { /// [`matches`]: Scheme::matches /// [`stats_options`]: Scheme::stats_options /// [`choose_scheme`]: Self::choose_scheme + #[expect(clippy::cognitive_complexity, reason = "tracing")] fn choose_and_compress( &self, canonical: Canonical, ctx: CompressorContext, ) -> VortexResult { + // Capture span-facing metadata before we move `canonical` into an `ArrayRef`. + let len = canonical.len(); + let cascade_depth = ctx.cascade_history().len(); + + // `eligible_count` is recorded after filtering; pre-declare it so `Span::record` + // works. + let _span = tracing::trace_span!( + target: TARGET_SELECT, + "choose_and_compress", + dtype = %canonical.dtype(), + len, + cascade_depth, + eligible_count = tracing::field::Empty, + ) + .entered(); + let eligible_schemes: Vec<&'static dyn Scheme> = self .schemes .iter() @@ -293,18 +387,35 @@ impl CascadingCompressor { .filter(|s| s.matches(&canonical) && !self.is_excluded(*s, &ctx)) .collect(); + tracing::Span::current().record("eligible_count", eligible_schemes.len()); + let array: ArrayRef = canonical.into(); // If there are no schemes that we can compress into, then just return it uncompressed. if eligible_schemes.is_empty() { + tracing::debug!( + target: TARGET_SELECT, + reason = "no_schemes", + "short_circuit", + ); return Ok(array); } // Nothing to compress if empty or all-null. if array.is_empty() { + tracing::debug!( + target: TARGET_SELECT, + reason = "empty", + "short_circuit", + ); return Ok(array); } if array.all_invalid()? { + tracing::debug!( + target: TARGET_SELECT, + reason = "all_null", + "short_circuit", + ); return Ok( ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(), ); @@ -321,24 +432,108 @@ impl CascadingCompressor { let mut data = ArrayAndStats::new(array, merged_opts); - // TODO(connor): Add tracing support for logging the winner estimate. - if let Some((winner, _winner_estimate)) = + let Some((winner, winner_estimate)) = self.choose_best_scheme(&eligible_schemes, &mut data, ctx.clone())? - { - // Sampling and estimation chose a scheme, so let's compress the whole array with it. - let compressed = winner.compress(self, &mut data, ctx)?; + else { + // No scheme beat the canonical encoding. + tracing::debug!( + target: TARGET_SELECT, + reason = "fell_through", + candidate_count = eligible_schemes.len(), + "short_circuit", + ); + return Ok(data.into_array()); + }; + + match winner_estimate { + WinnerEstimate::Ratio(ratio) => tracing::debug!( + target: TARGET_SELECT, + scheme = %winner.id(), + estimated_ratio = ratio, + candidate_count = eligible_schemes.len(), + "scheme.winner", + ), + WinnerEstimate::AlwaysUse => tracing::debug!( + target: TARGET_SELECT, + scheme = %winner.id(), + always_use = true, + candidate_count = eligible_schemes.len(), + "scheme.winner", + ), + } - // TODO(connor): Add a tracing warning here if compression with the chosen scheme - // failed, since there was likely more we could have done while choosing schemes. + // Wrap the actual encode in its own span so tracing-perfetto / + // tracing-timing get a distinct timing frame per scheme compression. + let compressed = { + let _encode_span = tracing::trace_span!( + target: TARGET_ENCODE, + "scheme.compress", + scheme = %winner.id(), + before_nbytes, + ) + .entered(); + winner.compress(self, &mut data, ctx)? + }; + + let after_nbytes = compressed.nbytes(); + // Guard against division by zero: a zero-byte output is legal (e.g. constant + // arrays) so we clamp to 1 for the display ratio rather than emit NaN/Inf. + let actual_ratio = before_nbytes as f64 / after_nbytes.max(1) as f64; + let accepted = after_nbytes < before_nbytes; + + match winner_estimate { + WinnerEstimate::Ratio(ratio) => tracing::debug!( + target: TARGET_ENCODE, + scheme = %winner.id(), + before_nbytes, + after_nbytes, + estimated_ratio = ratio, + actual_ratio, + accepted, + "scheme.compress_result", + ), + WinnerEstimate::AlwaysUse => tracing::debug!( + target: TARGET_ENCODE, + scheme = %winner.id(), + before_nbytes, + after_nbytes, + always_use = true, + actual_ratio, + accepted, + "scheme.compress_result", + ), + } - // Only choose the compressed array if it is smaller than the canonical one. - if compressed.nbytes() < before_nbytes { - // TODO(connor): Add a tracing warning here too. - return Ok(compressed); - } + if accepted { + return Ok(compressed); + } + + // Winner was picked but its output was not smaller than the canonical input. + // This is silent in the old code and hides real compressor bugs (bad estimate, + // pathological data). Surface it explicitly. + match winner_estimate { + WinnerEstimate::Ratio(ratio) => tracing::debug!( + target: TARGET_SELECT, + reason = "larger_output", + scheme = %winner.id(), + before_nbytes, + after_nbytes, + estimated_ratio = ratio, + actual_ratio, + "short_circuit", + ), + WinnerEstimate::AlwaysUse => tracing::debug!( + target: TARGET_SELECT, + reason = "larger_output", + scheme = %winner.id(), + before_nbytes, + after_nbytes, + always_use = true, + actual_ratio, + "short_circuit", + ), } - // No scheme improved on the original. Ok(data.into_array()) } @@ -347,6 +542,7 @@ impl CascadingCompressor { /// registration order (earlier in the list wins). /// /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio + #[expect(clippy::cognitive_complexity, reason = "tracing")] fn choose_best_scheme( &self, schemes: &[&'static dyn Scheme], @@ -360,8 +556,12 @@ impl CascadingCompressor { for &scheme in schemes { let estimate = scheme.expected_compression_ratio(data, ctx.clone()); - // TODO(connor): Rather than computing the deferred estimates eagerly, it would be - // better to look at all quick estimates and see if it makes sense to sample at all. + // Emit the initial estimate verdict for every scheme the compressor looks at. + // For `Ratio` this carries the numeric estimate directly; for deferred estimates the + // ratio is unknown at this point and will be reported via a follow-up + // `scheme.evaluated.resolved` event once computed. + emit_scheme_evaluated(scheme, &estimate); + match estimate { CompressionEstimate::Verdict(verdict) => { if let Some(winner_estimate) = @@ -378,12 +578,43 @@ impl CascadingCompressor { ctx.clone(), )?; + tracing::trace!( + target: TARGET_SELECT, + scheme = %scheme.id(), + kind = "Sample", + ratio = sample_ratio, + "scheme.evaluated.resolved", + ); + if is_better_ratio(sample_ratio, &best) { best = Some((scheme, sample_ratio)); } } CompressionEstimate::Deferred(DeferredEstimate::Callback(estimate_callback)) => { let verdict = estimate_callback(self, data, ctx.clone())?; + let resolved_kind = match verdict { + EstimateVerdict::Skip => "Skip", + EstimateVerdict::AlwaysUse => "AlwaysUse", + EstimateVerdict::Ratio(_) => "Ratio", + }; + if let EstimateVerdict::Ratio(ratio) = verdict { + tracing::trace!( + target: TARGET_SELECT, + scheme = %scheme.id(), + kind = "Estimate", + resolved_kind, + ratio, + "scheme.evaluated.resolved", + ); + } else { + tracing::trace!( + target: TARGET_SELECT, + scheme = %scheme.id(), + kind = "Estimate", + resolved_kind, + "scheme.evaluated.resolved", + ); + } if let Some(winner_estimate) = Self::check_and_update_estimate_verdict(&mut best, scheme, verdict) { diff --git a/vortex-compressor/src/estimate.rs b/vortex-compressor/src/estimate.rs index 957b54e57a7..ce6147f0b7e 100644 --- a/vortex-compressor/src/estimate.rs +++ b/vortex-compressor/src/estimate.rs @@ -16,8 +16,18 @@ use crate::sample::SAMPLE_SIZE; use crate::sample::sample; use crate::sample::sample_count_approx_one_percent; use crate::scheme::Scheme; +use crate::scheme::SchemeExt; use crate::stats::ArrayAndStats; +/// Tracing target for sampling-based ratio estimation (sample sizing and sample compression +/// results). See the crate-level `Observability` section of [`crate`] for the full taxonomy. +const TARGET_ESTIMATE: &str = "vortex_compressor::estimate"; + +/// Tracing target for the sub-span covering the sample compression itself. Shared with +/// [`crate::compressor`] so that users filtering on `vortex_compressor::encode` see both the +/// final encode and any sample encodes that fed into its selection. +const TARGET_ENCODE: &str = "vortex_compressor::encode"; + /// Closure type for [`DeferredEstimate::Callback`]. /// /// The compressor calls this with the same arguments it would pass to sampling. The closure must @@ -104,16 +114,29 @@ pub(super) fn estimate_compression_ratio_with_sampling( array: &ArrayRef, ctx: CompressorContext, ) -> VortexResult { + let _span = tracing::trace_span!( + target: TARGET_ESTIMATE, + "estimate.sample", + scheme = %scheme.id(), + source_len = array.len(), + ) + .entered(); + let sample_array = if ctx.is_sample() { array.clone() } else { let source_len = array.len(); let sample_count = sample_count_approx_one_percent(source_len); + let sampled_len = u64::from(SAMPLE_SIZE) * u64::from(sample_count); tracing::trace!( - "Sampling {} values out of {}", - SAMPLE_SIZE as u64 * sample_count as u64, - source_len + target: TARGET_ESTIMATE, + scheme = %scheme.id(), + sample_count, + sample_size = SAMPLE_SIZE, + sampled_len, + source_len = source_len as u64, + "sample.collected", ); // `ArrayAndStats` expects a canonical array (so that it can easily compute lazy stats). @@ -125,22 +148,37 @@ pub(super) fn estimate_compression_ratio_with_sampling( let mut sample_data = ArrayAndStats::new(sample_array, scheme.stats_options()); let sample_ctx = ctx.with_sampling(); - let after = scheme - .compress(compressor, &mut sample_data, sample_ctx)? - .nbytes(); - let before = sample_data.array().nbytes(); - - // TODO(connor): Issue https://github.com/vortex-data/vortex/issues/7268. - // if after == 0 { - // tracing::warn!( - // scheme = %scheme.id(), - // "sample compressed to 0 bytes, which should only happen for constant arrays", - // ); - // } + // Wrap the sample compression in its own encode span so that timing subscribers + // (tracing-perfetto / tracing-timing) can attribute sampling cost separately from the + // final full-array compression. + let after = { + let _sample_encode = tracing::trace_span!( + target: TARGET_ENCODE, + "sample.compress", + scheme = %scheme.id(), + ) + .entered(); + scheme.compress(compressor, &mut sample_data, sample_ctx)? + } + .nbytes(); - let ratio = before as f64 / after as f64; + let before = sample_data.array().nbytes(); - tracing::debug!("estimate_compression_ratio_with_sampling(compressor={scheme:#?}) = {ratio}",); + // TODO(connor): Issue https://github.com/vortex-data/vortex/issues/7268. Sample compressing + // to 0 bytes should only happen for constant arrays; anything else is a scheme bug. + + // Guard against division by zero: zero-byte samples are legal (constant arrays). Clamp + // to 1 so the ratio remains finite rather than emitting `inf`/`nan`. + let ratio = before as f64 / after.max(1) as f64; + + tracing::debug!( + target: TARGET_ESTIMATE, + scheme = %scheme.id(), + sampled_before = before, + sampled_after = after, + sampled_ratio = ratio, + "sample.result", + ); Ok(ratio) } diff --git a/vortex-compressor/src/lib.rs b/vortex-compressor/src/lib.rs index 65fd3f09c56..e25620a2836 100644 --- a/vortex-compressor/src/lib.rs +++ b/vortex-compressor/src/lib.rs @@ -15,6 +15,107 @@ //! //! This crate contains no encoding dependencies. Batteries-included compressors are provided by //! downstream crates like `vortex-btrblocks`, which register different encodings to the compressor. +//! +//! # Observability +//! +//! The compressor emits structured `tracing` spans and events through four independent +//! targets. Pick one with `RUST_LOG` to study a single aspect of the compressor at a time, +//! or combine them. No subscriber is installed by this crate; the caller does that. +//! +//! | Target | What it covers | +//! |---------------------------------|-----------------------------------------------------------------------------------| +//! | `vortex_compressor::cascade` | Top-level `compress` and `compress_child` spans — the cascade-tree shape. | +//! | `vortex_compressor::select` | Scheme eligibility, per-scheme evaluation, winner, and short-circuit reasons. | +//! | `vortex_compressor::estimate` | Sampling: sample sizing, sample compression, and the resulting estimated ratio. | +//! | `vortex_compressor::encode` | The winner's encode span and its estimated-vs-actual `scheme.compress_result`. | +//! +//! ## Recipes +//! +//! Summary of each leaf (which scheme won, estimated vs actual ratio, accepted?): +//! +//! ```text +//! RUST_LOG=vortex_compressor::encode=debug cargo test -p vortex-btrblocks +//! ``` +//! +//! Every scheme evaluated for every leaf, with estimate kind and ratio: +//! +//! ```text +//! RUST_LOG=vortex_compressor::select=trace cargo test -p vortex-btrblocks +//! ``` +//! +//! Sample sizes and sample compression results: +//! +//! ```text +//! RUST_LOG=vortex_compressor::estimate=trace cargo test -p vortex-btrblocks +//! ``` +//! +//! Cascade tree (which scheme cascaded into which child): +//! +//! ```text +//! RUST_LOG=vortex_compressor::cascade=debug cargo test -p vortex-btrblocks +//! ``` +//! +//! Everything (firehose): +//! +//! ```text +//! RUST_LOG=vortex_compressor=trace cargo test -p vortex-btrblocks +//! ``` +//! +//! Combine targets: +//! +//! ```text +//! RUST_LOG=vortex_compressor::encode=debug,vortex_compressor::estimate=debug cargo run ... +//! ``` +//! +//! ## Span inventory +//! +//! | Span | Target | Level | Key fields | +//! |---------------------------------|-----------|-------|---------------------------------------------------------| +//! | `CascadingCompressor::compress` | cascade | trace | `len`, `nbytes`, `dtype` | +//! | `compress_child` | cascade | trace | `parent`, `child_index`, `cascade_depth`, `len` | +//! | `choose_and_compress` | select | trace | `dtype`, `len`, `cascade_depth`, `eligible_count` | +//! | `estimate.sample` | estimate | trace | `scheme`, `source_len` | +//! | `scheme.compress` | encode | trace | `scheme`, `before_nbytes` | +//! | `sample.compress` | encode | trace | `scheme` | +//! +//! ## Event inventory +//! +//! | Event | Target | Level | Fields | +//! |-----------------------------|-------------------|-------|-------------------------------------------------------------------------------------------------| +//! | `scheme.evaluated` | select | trace | `scheme`, `kind`, `ratio` (Option) | +//! | `scheme.evaluated.resolved` | select | trace | `scheme`, `kind`, `resolved_kind`?, `ratio`? | +//! | `scheme.winner` | select | debug | `scheme`, `estimated_ratio`, `candidate_count` | +//! | `scheme.compress_result` | encode | debug | `scheme`, `before_nbytes`, `after_nbytes`, `estimated_ratio`, `actual_ratio`, `accepted` | +//! | `sample.collected` | estimate | trace | `scheme`, `sample_count`, `sample_size`, `sampled_len`, `source_len` | +//! | `sample.result` | estimate | debug | `scheme`, `sampled_before`, `sampled_after`, `sampled_ratio` | +//! | `short_circuit` | select / cascade | debug | `reason` (`cascade_exhausted` \| `no_schemes` \| `empty` \| `all_null` \| `fell_through` \| `larger_output`), scheme?/parent? | +//! +//! An `estimated_ratio` of [`f64::INFINITY`] indicates a scheme that returned +//! [`CompressionEstimate::AlwaysUse`](estimate::CompressionEstimate::AlwaysUse). +//! +//! Field names are considered stable and are meant to be matched directly by downstream +//! observability tooling. This means `tracing-opentelemetry`, `tracing-perfetto`, and +//! `tracing-timing` subscribers work with no adapter code — attach a layer in your binary's +//! subscriber registry and the spans/events will be captured. +//! +//! ## Plugging in a subscriber +//! +//! A minimal stderr-only setup: +//! +//! ```rust,ignore +//! use tracing_subscriber::EnvFilter; +//! use tracing_subscriber::layer::SubscriberExt; +//! use tracing_subscriber::util::SubscriberInitExt; +//! +//! tracing_subscriber::registry() +//! .with(EnvFilter::from_default_env()) +//! .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) +//! .init(); +//! ``` +//! +//! To capture timings or export to a collector, add a second layer +//! (`tracing_perfetto::PerfettoLayer`, `tracing_timing::Builder`, +//! `tracing_opentelemetry::layer(...)`) to the registry. pub mod builtins; pub mod ctx; diff --git a/vortex-compressor/tests/tracing.rs b/vortex-compressor/tests/tracing.rs new file mode 100644 index 00000000000..ee479be1b15 --- /dev/null +++ b/vortex-compressor/tests/tracing.rs @@ -0,0 +1,258 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +// Integration tests in `tests/` are their own test crate; clippy's "outside test module" +// lint does not apply here. See https://github.com/rust-lang/rust-clippy/issues/11024. +#![allow(clippy::tests_outside_test_module)] +// Tests may panic or unwrap freely — this is the standard relaxation for test code. +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] + +//! Integration tests for `vortex-compressor`'s tracing instrumentation. +//! +//! These tests pin the names and stable fields of the events emitted by the compressor by +//! attaching an in-memory capture layer and compressing a small array. They exist to: +//! +//! 1. Catch accidental rename or deletion of observability events that downstream tooling +//! (dashboards, alerting, perfetto recipes) depends on. +//! 2. Document, by example, what an end-to-end compression produces in the trace stream. +//! +//! The capture layer records structured fields instead of formatted strings, so assertions +//! are against typed values rather than substring matches. + +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::sync::Arc; + +use parking_lot::Mutex; +use tracing::Event; +use tracing::Subscriber; +use tracing::field::Field; +use tracing::field::Visit; +use tracing_subscriber::Layer; +use tracing_subscriber::layer::Context; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::registry::Registry; +use vortex_array::IntoArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::validity::Validity; +use vortex_buffer::Buffer; +use vortex_compressor::CascadingCompressor; +use vortex_compressor::builtins::IntDictScheme; + +/// A captured tracing event: its target, name, and structured fields. +#[derive(Debug, Clone)] +struct CapturedEvent { + target: String, + name: String, + fields: BTreeMap, +} + +impl CapturedEvent { + fn field(&self, key: &str) -> Option<&str> { + self.fields.get(key).map(String::as_str) + } +} + +/// `Visit` implementation that records field values into a `BTreeMap`. +/// +/// We normalize every value type to its Debug representation so assertions are homogeneous, +/// e.g. `event.field("accepted") == Some("true")`. +#[derive(Default)] +struct FieldVisitor { + fields: BTreeMap, +} + +impl Visit for FieldVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn Debug) { + self.fields + .insert(field.name().to_string(), format!("{value:?}")); + } + + fn record_str(&mut self, field: &Field, value: &str) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_i64(&mut self, field: &Field, value: i64) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_u64(&mut self, field: &Field, value: u64) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_f64(&mut self, field: &Field, value: f64) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_bool(&mut self, field: &Field, value: bool) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } +} + +/// A `tracing_subscriber::Layer` that pushes every event into a shared `Vec`. +/// +/// Use [`install_capture_layer`] to attach it to a per-test subscriber. +#[derive(Clone, Default)] +struct CaptureLayer { + events: Arc>>, +} + +impl Layer for CaptureLayer { + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + let mut visitor = FieldVisitor::default(); + event.record(&mut visitor); + + let meta = event.metadata(); + // `tracing` stores the event "name" under the reserved `message` field. + let name = visitor + .fields + .remove("message") + .unwrap_or_else(|| meta.name().to_string()); + + self.events.lock().push(CapturedEvent { + target: meta.target().to_string(), + name, + fields: visitor.fields, + }); + } +} + +/// Installs a fresh capture layer as the thread-local default subscriber and returns the +/// shared buffer plus a dispatch guard that restores the previous default when dropped. +/// +/// Cargo runs tests in parallel, so we deliberately never touch the global default. +fn install_capture_layer() -> ( + Arc>>, + tracing::dispatcher::DefaultGuard, +) { + let layer = CaptureLayer::default(); + let events = Arc::clone(&layer.events); + let subscriber = Registry::default().with(layer); + let guard = tracing::subscriber::set_default(subscriber); + (events, guard) +} + +/// Compressing a low-cardinality integer array must emit exactly one `scheme.winner` +/// followed by one `scheme.compress_result` for `vortex.int.dict`, with `accepted = true`. +#[test] +fn winner_and_compress_result_events_emitted() { + let (events, _guard) = install_capture_layer(); + + // 100 rows alternating between three distinct values — easily dict-encodable. + let values: Vec = (0..100).map(|i| i % 3).collect(); + let array = PrimitiveArray::new(Buffer::from_iter(values), Validity::NonNullable).into_array(); + + let compressor = CascadingCompressor::new(vec![&IntDictScheme]); + let _compressed = compressor.compress(&array).unwrap(); + + let events = events.lock(); + let winners: Vec<_> = events + .iter() + .filter(|e| e.name == "scheme.winner") + .collect(); + assert_eq!(winners.len(), 1, "expected exactly one scheme.winner event"); + assert_eq!(winners[0].field("scheme"), Some("vortex.int.dict")); + assert_eq!(winners[0].target, "vortex_compressor::select"); + + let results: Vec<_> = events + .iter() + .filter(|e| e.name == "scheme.compress_result") + .collect(); + assert_eq!( + results.len(), + 1, + "expected exactly one scheme.compress_result event" + ); + let result = results[0]; + assert_eq!(result.field("scheme"), Some("vortex.int.dict")); + assert_eq!(result.field("accepted"), Some("true")); + assert_eq!(result.target, "vortex_compressor::encode"); + // Sanity-check that the numeric fields exist and parse. + assert!( + result + .field("before_nbytes") + .unwrap() + .parse::() + .is_ok() + ); + assert!(result.field("after_nbytes").unwrap().parse::().is_ok()); + assert!(result.field("actual_ratio").unwrap().parse::().is_ok()); +} + +/// An empty array must take the `empty` short-circuit path before touching any scheme. +#[test] +fn short_circuit_empty_array_emits_reason_empty() { + let (events, _guard) = install_capture_layer(); + + let array = PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable).into_array(); + + let compressor = CascadingCompressor::new(vec![&IntDictScheme]); + let _compressed = compressor.compress(&array).unwrap(); + + let short_circuits: Vec<_> = events + .lock() + .iter() + .filter(|e| e.name == "short_circuit") + .cloned() + .collect(); + + assert!( + short_circuits + .iter() + .any(|e| e.field("reason") == Some("empty")), + "expected a short_circuit event with reason=empty, got: {short_circuits:#?}", + ); +} + +/// A compression that uses a scheme returning `Sample` must emit `sample.collected` +/// followed by `sample.result`, both with finite positive ratios and matching scheme names. +/// +/// `StringDictScheme` defers to `CompressionEstimate::Sample`, so compressing a large +/// low-cardinality string column exercises the sampling path end to end. +#[test] +fn sampling_emits_sample_collected_and_result() { + use vortex_array::arrays::VarBinArray; + use vortex_compressor::builtins::StringDictScheme; + + let (events, _guard) = install_capture_layer(); + + // A large string column with very low cardinality — forces Sample to run on a large + // enough input that `sample_count_approx_one_percent` produces more than one slice. + let strs: Vec<&str> = (0..10_000) + .map(|i| ["red", "green", "blue"][i % 3]) + .collect(); + let array = VarBinArray::from(strs).into_array(); + + let compressor = CascadingCompressor::new(vec![&StringDictScheme]); + let _compressed = compressor.compress(&array).unwrap(); + + let events = events.lock(); + let collected_idx = events + .iter() + .position(|e| e.name == "sample.collected") + .expect("expected a sample.collected event"); + let result_idx = events + .iter() + .position(|e| e.name == "sample.result") + .expect("expected a sample.result event"); + + assert!( + collected_idx < result_idx, + "sample.collected should precede sample.result", + ); + + let result = &events[result_idx]; + assert_eq!(result.target, "vortex_compressor::estimate"); + assert_eq!(result.field("scheme"), Some("vortex.string.dict")); + let ratio: f64 = result + .field("sampled_ratio") + .expect("sampled_ratio field") + .parse() + .expect("sampled_ratio parses as f64"); + assert!(ratio.is_finite() && ratio > 0.0, "ratio = {ratio}"); +} From e634e315ef2d9eba4b08ea1dace54b2bd04f2a70 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 10 Apr 2026 03:10:07 +0000 Subject: [PATCH 2/4] feat(btrblocks): add tracing entry span, drop stray zigzag debug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instrument `BtrBlocksCompressor::compress` with a `#[tracing::instrument]` on the `vortex_compressor::cascade` target so downstream trace consumers (tracing-perfetto, tracing-opentelemetry) get a distinct BtrBlocks entry frame nested above the generic `CascadingCompressor::compress` pipeline span. Also delete the stray `tracing::debug!("zigzag output: {}", ...)` line in `schemes/integer.rs` — it predates the centralized `scheme.compress_result` event and is now redundant. Add a short `# Observability` section to the crate docs pointing at `vortex_compressor`'s full reference, plus one recipe. Signed-off-by: Claude --- vortex-btrblocks/Cargo.toml | 2 +- vortex-btrblocks/src/canonical_compressor.rs | 18 ++++++++++++++++++ vortex-btrblocks/src/lib.rs | 17 +++++++++++++++++ vortex-btrblocks/src/schemes/integer.rs | 4 +++- 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index dab4f27fb51..74e88a9b1b9 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -19,7 +19,7 @@ num-traits = { workspace = true } pco = { workspace = true, optional = true } rand = { workspace = true } rustc-hash = { workspace = true } -tracing = { workspace = true } +tracing = { workspace = true, features = ["std", "attributes"] } vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } diff --git a/vortex-btrblocks/src/canonical_compressor.rs b/vortex-btrblocks/src/canonical_compressor.rs index 885178dd22d..98c265caa89 100644 --- a/vortex-btrblocks/src/canonical_compressor.rs +++ b/vortex-btrblocks/src/canonical_compressor.rs @@ -38,6 +38,24 @@ pub struct BtrBlocksCompressor( impl BtrBlocksCompressor { /// Compresses an array using BtrBlocks-inspired compression. + /// + /// This call is instrumented with a `vortex_compressor::cascade` span named + /// `BtrBlocksCompressor::compress` so that downstream tracing consumers + /// (e.g. `tracing-perfetto`) have a distinct entry frame to pivot on, nested + /// above the generic [`CascadingCompressor::compress`] span that actually + /// runs the pipeline. See the `Observability` section of the + /// [`vortex_compressor`] crate docs for the full tracing reference. + #[tracing::instrument( + target = "vortex_compressor::cascade", + name = "BtrBlocksCompressor::compress", + level = "trace", + skip_all, + fields( + len = array.len(), + nbytes = array.nbytes(), + dtype = %array.dtype(), + ), + )] pub fn compress(&self, array: &ArrayRef) -> VortexResult { self.0.compress(array) } diff --git a/vortex-btrblocks/src/lib.rs b/vortex-btrblocks/src/lib.rs index 1f6d5a2d816..fd74b80c7f2 100644 --- a/vortex-btrblocks/src/lib.rs +++ b/vortex-btrblocks/src/lib.rs @@ -52,6 +52,23 @@ //! .build(); //! ``` //! +//! # Observability +//! +//! [`BtrBlocksCompressor`] participates in the [`vortex_compressor`] tracing target system. +//! See the [`vortex_compressor` crate docs](vortex_compressor#observability) for the full +//! reference on targets, spans, and events. +//! +//! The top-level [`BtrBlocksCompressor::compress`] call adds its own +//! `vortex_compressor::cascade` span (named `BtrBlocksCompressor::compress`) that nests +//! above the generic cascading-compressor pipeline, giving downstream trace consumers a +//! distinct entry frame. +//! +//! Quick start — one line per leaf with scheme, estimated ratio, actual ratio, accepted?: +//! +//! ```text +//! RUST_LOG=vortex_compressor::encode=debug cargo test -p vortex-btrblocks +//! ``` +//! //! [BtrBlocks]: https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf mod builder; diff --git a/vortex-btrblocks/src/schemes/integer.rs b/vortex-btrblocks/src/schemes/integer.rs index 47fc52225aa..783bb382be9 100644 --- a/vortex-btrblocks/src/schemes/integer.rs +++ b/vortex-btrblocks/src/schemes/integer.rs @@ -294,7 +294,9 @@ impl Scheme for ZigZagScheme { let compressed = compressor.compress_child(&encoded.into_array(), &ctx, self.id(), 0)?; - tracing::debug!("zigzag output: {}", compressed.encoding_id()); + // NOTE: scheme-level compression results are emitted centrally as the + // `scheme.compress_result` event on the `vortex_compressor::encode` + // target. See the `Observability` section of the `vortex_compressor` crate docs. Ok(ZigZag::try_new(compressed)?.into_array()) } From 2d77ca84333b0fe01a3a6d73f5ddf07d6eb6e71f Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Fri, 10 Apr 2026 11:32:56 -0400 Subject: [PATCH 3/4] add json output Signed-off-by: Connor Tsui --- benchmarks/compress-bench/src/main.rs | 3 +- benchmarks/datafusion-bench/src/main.rs | 3 +- benchmarks/duckdb-bench/src/main.rs | 3 +- benchmarks/lance-bench/src/main.rs | 3 +- benchmarks/random-access-bench/src/main.rs | 3 +- vortex-bench/Cargo.toml | 1 + vortex-bench/src/bin/data-gen.rs | 8 +- vortex-bench/src/conversions.rs | 7 +- vortex-bench/src/utils/logging.rs | 90 +++++++++++++++++----- vortex-compressor/src/compressor.rs | 4 +- 10 files changed, 98 insertions(+), 27 deletions(-) diff --git a/benchmarks/compress-bench/src/main.rs b/benchmarks/compress-bench/src/main.rs index 26b71f1abe3..0e53d7f9b9b 100644 --- a/benchmarks/compress-bench/src/main.rs +++ b/benchmarks/compress-bench/src/main.rs @@ -15,6 +15,7 @@ use regex::Regex; use vortex::utils::aliases::hash_map::HashMap; use vortex_bench::Engine; use vortex_bench::Format; +use vortex_bench::LogFormat; use vortex_bench::Target; use vortex_bench::compress::CompressMeasurements; use vortex_bench::compress::CompressOp; @@ -75,7 +76,7 @@ struct Args { async fn main() -> anyhow::Result<()> { let args = Args::parse(); - setup_logging_and_tracing(args.verbose, args.tracing)?; + setup_logging_and_tracing(args.verbose, args.tracing, LogFormat::Text)?; run_compress( args.iterations, diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index ad0df8ea74a..9e28e745410 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -32,6 +32,7 @@ use vortex_bench::BenchmarkArg; use vortex_bench::CompactionStrategy; use vortex_bench::Engine; use vortex_bench::Format; +use vortex_bench::LogFormat; use vortex_bench::Opt; use vortex_bench::Opts; use vortex_bench::SESSION; @@ -110,7 +111,7 @@ async fn main() -> anyhow::Result<()> { let opts = Opts::from(args.options); set_join_set_tracer(get_static_tracer())?; - setup_logging_and_tracing(args.verbose, args.tracing)?; + setup_logging_and_tracing(args.verbose, args.tracing, LogFormat::Text)?; let benchmark = create_benchmark(args.benchmark, &opts)?; diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index 7ab8f1ac7ab..fb18574ad1c 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -14,6 +14,7 @@ use vortex_bench::BenchmarkArg; use vortex_bench::CompactionStrategy; use vortex_bench::Engine; use vortex_bench::Format; +use vortex_bench::LogFormat; use vortex_bench::Opt; use vortex_bench::Opts; use vortex_bench::conversions::convert_parquet_directory_to_vortex; @@ -87,7 +88,7 @@ fn main() -> anyhow::Result<()> { let args = Args::parse(); let opts = Opts::from(args.options); - setup_logging_and_tracing(args.verbose, args.tracing)?; + setup_logging_and_tracing(args.verbose, args.tracing, LogFormat::Text)?; let benchmark = create_benchmark(args.benchmark, &opts)?; diff --git a/benchmarks/lance-bench/src/main.rs b/benchmarks/lance-bench/src/main.rs index 73fa8426ffe..e2d51ba80b3 100644 --- a/benchmarks/lance-bench/src/main.rs +++ b/benchmarks/lance-bench/src/main.rs @@ -18,6 +18,7 @@ use vortex_bench::Benchmark; use vortex_bench::BenchmarkArg; use vortex_bench::Engine; use vortex_bench::Format; +use vortex_bench::LogFormat; use vortex_bench::Opt; use vortex_bench::Opts; use vortex_bench::create_benchmark; @@ -74,7 +75,7 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); let opts = Opts::from(args.options); - setup_logging_and_tracing(args.verbose, args.tracing)?; + setup_logging_and_tracing(args.verbose, args.tracing, LogFormat::Text)?; let benchmark = create_benchmark(args.benchmark, &opts)?; diff --git a/benchmarks/random-access-bench/src/main.rs b/benchmarks/random-access-bench/src/main.rs index 852332528a8..485aa954c9e 100644 --- a/benchmarks/random-access-bench/src/main.rs +++ b/benchmarks/random-access-bench/src/main.rs @@ -16,6 +16,7 @@ use rand_distr::Distribution; use rand_distr::Exp; use vortex_bench::Engine; use vortex_bench::Format; +use vortex_bench::LogFormat; use vortex_bench::Target; use vortex_bench::create_output_writer; use vortex_bench::datasets::feature_vectors::FeatureVectorsData; @@ -190,7 +191,7 @@ struct Args { async fn main() -> Result<()> { let args = Args::parse(); - setup_logging_and_tracing(args.verbose, args.tracing)?; + setup_logging_and_tracing(args.verbose, args.tracing, LogFormat::Text)?; let datasets: Vec> = args .datasets diff --git a/vortex-bench/Cargo.toml b/vortex-bench/Cargo.toml index 62d302f12e1..5bffd638534 100644 --- a/vortex-bench/Cargo.toml +++ b/vortex-bench/Cargo.toml @@ -53,6 +53,7 @@ tracing = { workspace = true } tracing-perfetto = { workspace = true } tracing-subscriber = { workspace = true, features = [ "env-filter", + "json", "tracing-log", ] } url = { workspace = true } diff --git a/vortex-bench/src/bin/data-gen.rs b/vortex-bench/src/bin/data-gen.rs index d38c977a6c6..63bb2ebd06d 100644 --- a/vortex-bench/src/bin/data-gen.rs +++ b/vortex-bench/src/bin/data-gen.rs @@ -17,6 +17,7 @@ use vortex_bench::Benchmark; use vortex_bench::BenchmarkArg; use vortex_bench::CompactionStrategy; use vortex_bench::Format; +use vortex_bench::LogFormat; use vortex_bench::Opt; use vortex_bench::Opts; use vortex_bench::conversions::convert_parquet_directory_to_vortex; @@ -37,6 +38,11 @@ struct Args { #[arg(long)] tracing: bool, + /// Format for the primary stderr log sink. `text` is the default human + /// readable format; `json` emits newline-delimited JSON suitable for `jq`. + #[arg(long, value_enum, default_value_t = LogFormat::Text)] + log_format: LogFormat, + #[arg(long, value_delimiter = ',', value_parser = value_parser!(Format))] formats: Vec, @@ -49,7 +55,7 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); let opts = Opts::from(args.options); - setup_logging_and_tracing(args.verbose, args.tracing)?; + setup_logging_and_tracing(args.verbose, args.tracing, args.log_format)?; let benchmark = create_benchmark(args.benchmark, &opts)?; diff --git a/vortex-bench/src/conversions.rs b/vortex-bench/src/conversions.rs index 3f21ab30ba0..f9e3c9e5ec7 100644 --- a/vortex-bench/src/conversions.rs +++ b/vortex-bench/src/conversions.rs @@ -179,6 +179,11 @@ pub async fn convert_parquet_directory_to_vortex( let parquet_file_path = parquet_path.join(format!("{filename}.parquet")); let output_path = vortex_dir.join(format!("{filename}.{}", format.ext())); + let span = tracing::info_span!( + "compress_file", + file = %filename, + strategy = ?compaction, + ); tokio::spawn( async move { idempotent_async(output_path.as_path(), move |vtx_file| async move { @@ -192,7 +197,7 @@ pub async fn convert_parquet_directory_to_vortex( .await .expect("Failed to write Vortex file") } - .in_current_span(), + .instrument(span), ) }) .buffer_unordered(concurrency) diff --git a/vortex-bench/src/utils/logging.rs b/vortex-bench/src/utils/logging.rs index cdc99b8fb59..a4d8a51f89b 100644 --- a/vortex-bench/src/utils/logging.rs +++ b/vortex-bench/src/utils/logging.rs @@ -4,34 +4,88 @@ use std::fs::File; use std::io::IsTerminal; +use clap::ValueEnum; use tracing::level_filters::LevelFilter; use tracing_perfetto::PerfettoLayer; use tracing_subscriber::EnvFilter; +use tracing_subscriber::Layer; use tracing_subscriber::prelude::*; -/// Initialize logging/tracing for a benchmark -pub fn setup_logging_and_tracing(verbose: bool, tracing: bool) -> anyhow::Result<()> { +/// Format for the primary stderr log sink. +/// +/// Selects between a human-oriented text formatter and a newline-delimited JSON +/// formatter. The two are mutually exclusive — pick whichever matches how you'll +/// consume the output. The choice is orthogonal to +/// [`setup_logging_and_tracing`]'s `tracing` flag, which controls a separate +/// Perfetto trace file and is unaffected by the format selected here. +/// +/// See the crate-level documentation of `vortex-compressor` for the full +/// inventory of fields emitted under each tracing target. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, ValueEnum)] +pub enum LogFormat { + /// Human-readable single-line records, with level, file, and line prefixes. + /// ANSI coloring is enabled automatically when stderr is a terminal. This + /// is the default and matches the historical behavior of this crate. + #[default] + Text, + /// Newline-delimited JSON — one complete JSON object per event, written to + /// stderr. Each record includes the event fields as typed values plus the + /// full span stack under the `spans` key, making it suitable for piping + /// into `jq` or ingestion into a log aggregator. + /// + /// Not to be confused with the Perfetto trace format emitted when the + /// `tracing` flag is set — that is a single-document Chrome Trace Event + /// file designed to be loaded into the Perfetto UI, and cannot be + /// meaningfully filtered line-by-line. + Json, +} + +/// Initialize logging/tracing for a benchmark. +/// +/// - `verbose`: when `RUST_LOG` is unset, raises the default filter from `INFO` +/// to `TRACE`. Has no effect when `RUST_LOG` is set (the env var wins). +/// - `perfetto`: when `true`, additionally attaches a +/// [`tracing_perfetto::PerfettoLayer`] that writes span begin/end events to +/// `trace.json` in the current directory. Intended to be loaded into the +/// Perfetto UI for flamegraph visualization. +/// - `format`: controls the primary stderr sink's formatting. See [`LogFormat`]. +pub fn setup_logging_and_tracing( + verbose: bool, + perfetto: bool, + format: LogFormat, +) -> anyhow::Result<()> { let filter = default_env_filter(verbose); - let fmt_layer = tracing_subscriber::fmt::layer() - .with_writer(std::io::stderr) - .with_level(true) - .with_file(true) - .with_line_number(true) - .with_ansi(std::io::stderr().is_terminal()); + let perfetto_layer = perfetto + .then(|| { + Ok::<_, anyhow::Error>( + PerfettoLayer::new(File::create("trace.json")?).with_debug_annotations(true), + ) + }) + .transpose()?; + + // `fmt::layer()` and `fmt::layer().json()` produce different concrete + // types, so we erase each to a `dyn Layer` via `.boxed()` and keep the + // registry chain uniform. + let fmt_layer: Box + Send + Sync> = match format { + LogFormat::Text => tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .with_level(true) + .with_file(true) + .with_line_number(true) + .with_ansi(std::io::stderr().is_terminal()) + .boxed(), + LogFormat::Json => tracing_subscriber::fmt::layer() + .json() + .with_writer(std::io::stderr) + .with_current_span(true) + .with_span_list(true) + .boxed(), + }; tracing_subscriber::registry() .with(filter) - .with( - tracing - .then(|| { - Ok::<_, anyhow::Error>( - PerfettoLayer::new(File::create("trace.json")?) - .with_debug_annotations(true), - ) - }) - .transpose()?, - ) + .with(perfetto_layer) .with(fmt_layer) .init(); diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index 22a14aaf88e..c80c26ddf06 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -358,7 +358,7 @@ impl CascadingCompressor { /// [`matches`]: Scheme::matches /// [`stats_options`]: Scheme::stats_options /// [`choose_scheme`]: Self::choose_scheme - #[expect(clippy::cognitive_complexity, reason = "tracing")] + #[allow(clippy::cognitive_complexity, reason = "tracing sometimes enabled")] fn choose_and_compress( &self, canonical: Canonical, @@ -542,7 +542,7 @@ impl CascadingCompressor { /// registration order (earlier in the list wins). /// /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio - #[expect(clippy::cognitive_complexity, reason = "tracing")] + #[allow(clippy::cognitive_complexity, reason = "tracing sometimes enabled")] fn choose_best_scheme( &self, schemes: &[&'static dyn Scheme], From 43e333abc533a01dd39ff5a184670495bf5e2ab0 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Fri, 10 Apr 2026 14:11:02 -0400 Subject: [PATCH 4/4] clean up Signed-off-by: Connor Tsui --- benchmarks/compress-bench/src/main.rs | 8 +- vortex-bench/src/bin/data-gen.rs | 5 +- vortex-btrblocks/Cargo.toml | 2 +- vortex-btrblocks/src/canonical_compressor.rs | 20 +- vortex-btrblocks/src/lib.rs | 14 +- vortex-btrblocks/src/schemes/integer.rs | 4 - vortex-compressor/src/compressor.rs | 134 ++++--- vortex-compressor/src/lib.rs | 31 +- vortex-compressor/tests/tracing.rs | 369 +++++++++++++++++++ 9 files changed, 494 insertions(+), 93 deletions(-) diff --git a/benchmarks/compress-bench/src/main.rs b/benchmarks/compress-bench/src/main.rs index 0e53d7f9b9b..807787019b8 100644 --- a/benchmarks/compress-bench/src/main.rs +++ b/benchmarks/compress-bench/src/main.rs @@ -70,13 +70,19 @@ struct Args { output_path: Option, #[arg(long)] tracing: bool, + /// Format for the primary stderr log sink. `text` is the default human-readable format; + /// `json` emits one JSON object per event, suitable for piping into `jq`. + /// + /// See [`LogFormat`] for the full details. + #[arg(long, value_enum, default_value_t = LogFormat::Text)] + log_format: LogFormat, } #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); - setup_logging_and_tracing(args.verbose, args.tracing, LogFormat::Text)?; + setup_logging_and_tracing(args.verbose, args.tracing, args.log_format)?; run_compress( args.iterations, diff --git a/vortex-bench/src/bin/data-gen.rs b/vortex-bench/src/bin/data-gen.rs index 63bb2ebd06d..46c0dc342fe 100644 --- a/vortex-bench/src/bin/data-gen.rs +++ b/vortex-bench/src/bin/data-gen.rs @@ -38,8 +38,9 @@ struct Args { #[arg(long)] tracing: bool, - /// Format for the primary stderr log sink. `text` is the default human - /// readable format; `json` emits newline-delimited JSON suitable for `jq`. + /// Format for the primary stderr log sink. `text` is the default + /// human-readable format; `json` emits one JSON object per event, suitable + /// for piping into `jq`. See [`LogFormat`] for the full details. #[arg(long, value_enum, default_value_t = LogFormat::Text)] log_format: LogFormat, diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 74e88a9b1b9..dab4f27fb51 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -19,7 +19,7 @@ num-traits = { workspace = true } pco = { workspace = true, optional = true } rand = { workspace = true } rustc-hash = { workspace = true } -tracing = { workspace = true, features = ["std", "attributes"] } +tracing = { workspace = true } vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } diff --git a/vortex-btrblocks/src/canonical_compressor.rs b/vortex-btrblocks/src/canonical_compressor.rs index 98c265caa89..e172afcae3b 100644 --- a/vortex-btrblocks/src/canonical_compressor.rs +++ b/vortex-btrblocks/src/canonical_compressor.rs @@ -39,23 +39,9 @@ pub struct BtrBlocksCompressor( impl BtrBlocksCompressor { /// Compresses an array using BtrBlocks-inspired compression. /// - /// This call is instrumented with a `vortex_compressor::cascade` span named - /// `BtrBlocksCompressor::compress` so that downstream tracing consumers - /// (e.g. `tracing-perfetto`) have a distinct entry frame to pivot on, nested - /// above the generic [`CascadingCompressor::compress`] span that actually - /// runs the pipeline. See the `Observability` section of the - /// [`vortex_compressor`] crate docs for the full tracing reference. - #[tracing::instrument( - target = "vortex_compressor::cascade", - name = "BtrBlocksCompressor::compress", - level = "trace", - skip_all, - fields( - len = array.len(), - nbytes = array.nbytes(), - dtype = %array.dtype(), - ), - )] + /// This is a thin delegate to [`CascadingCompressor::compress`], which owns the tracing + /// instrumentation. See the `Observability` section of the [`vortex_compressor`] crate + /// docs for the full tracing reference. pub fn compress(&self, array: &ArrayRef) -> VortexResult { self.0.compress(array) } diff --git a/vortex-btrblocks/src/lib.rs b/vortex-btrblocks/src/lib.rs index fd74b80c7f2..66e08d806d5 100644 --- a/vortex-btrblocks/src/lib.rs +++ b/vortex-btrblocks/src/lib.rs @@ -54,16 +54,12 @@ //! //! # Observability //! -//! [`BtrBlocksCompressor`] participates in the [`vortex_compressor`] tracing target system. -//! See the [`vortex_compressor` crate docs](vortex_compressor#observability) for the full -//! reference on targets, spans, and events. +//! [`BtrBlocksCompressor`] is a thin delegate to [`CascadingCompressor`], which owns all +//! tracing instrumentation. See the +//! [`vortex_compressor` crate docs](vortex_compressor#observability) for the full reference +//! on targets, spans, and events. //! -//! The top-level [`BtrBlocksCompressor::compress`] call adds its own -//! `vortex_compressor::cascade` span (named `BtrBlocksCompressor::compress`) that nests -//! above the generic cascading-compressor pipeline, giving downstream trace consumers a -//! distinct entry frame. -//! -//! Quick start — one line per leaf with scheme, estimated ratio, actual ratio, accepted?: +//! Quick start: one line per leaf with scheme, estimated ratio, actual ratio, accepted?: //! //! ```text //! RUST_LOG=vortex_compressor::encode=debug cargo test -p vortex-btrblocks diff --git a/vortex-btrblocks/src/schemes/integer.rs b/vortex-btrblocks/src/schemes/integer.rs index 783bb382be9..c25d5ac6b39 100644 --- a/vortex-btrblocks/src/schemes/integer.rs +++ b/vortex-btrblocks/src/schemes/integer.rs @@ -294,10 +294,6 @@ impl Scheme for ZigZagScheme { let compressed = compressor.compress_child(&encoded.into_array(), &ctx, self.id(), 0)?; - // NOTE: scheme-level compression results are emitted centrally as the - // `scheme.compress_result` event on the `vortex_compressor::encode` - // target. See the `Observability` section of the `vortex_compressor` crate docs. - Ok(ZigZag::try_new(compressed)?.into_array()) } } diff --git a/vortex-compressor/src/compressor.rs b/vortex-compressor/src/compressor.rs index c80c26ddf06..6b87730775d 100644 --- a/vortex-compressor/src/compressor.rs +++ b/vortex-compressor/src/compressor.rs @@ -38,6 +38,7 @@ use crate::builtins::IntDictScheme; use crate::ctx::CompressorContext; use crate::estimate::CompressionEstimate; use crate::estimate::DeferredEstimate; +use crate::estimate::EstimateFn; use crate::estimate::EstimateVerdict; use crate::estimate::estimate_compression_ratio_with_sampling; use crate::estimate::is_better_ratio; @@ -72,13 +73,15 @@ const TARGET_CASCADE: &str = "vortex_compressor::cascade"; /// Emits a structured `scheme.evaluated` trace event on [`TARGET_SELECT`] for one scheme's /// initial estimation verdict. /// -/// For `Ratio(r)` the numeric estimate is recorded directly. For `Sample` and `Estimate` -/// the ratio is not yet known at this point; a follow-up `scheme.evaluated.resolved` event -/// is emitted by the caller after the deferred computation finishes. +/// For [`CompressionEstimate::Verdict(EstimateVerdict::Ratio)`] the numeric estimate is recorded +/// directly as a typed `f64`, so JSON subscribers get a proper number. For all other variants the +/// `ratio` field is omitted entirely. The `kind` field distinguishes the variants. For deferred +/// estimates a follow-up `scheme.evaluated.resolved` event is emitted by the caller once the +/// deferred computation finishes. /// -/// Defined as a standalone helper (rather than inlined) because the `match` expression that -/// extracts `kind` and the optional `ratio` field is the only repetition worth factoring out -/// of [`CascadingCompressor::choose_best_scheme`]. +/// Defined as a standalone helper (rather than inlined) because the `match` expression +/// that extracts `kind` is the only repetition worth factoring out of +/// [`CascadingCompressor::choose_best_scheme`]. fn emit_scheme_evaluated(scheme: &'static dyn Scheme, estimate: &CompressionEstimate) { let kind: &'static str = match estimate { CompressionEstimate::Verdict(EstimateVerdict::Skip) => "Skip", @@ -542,7 +545,6 @@ impl CascadingCompressor { /// registration order (earlier in the list wins). /// /// [`expected_compression_ratio`]: Scheme::expected_compression_ratio - #[allow(clippy::cognitive_complexity, reason = "tracing sometimes enabled")] fn choose_best_scheme( &self, schemes: &[&'static dyn Scheme], @@ -571,53 +573,16 @@ impl CascadingCompressor { } } CompressionEstimate::Deferred(DeferredEstimate::Sample) => { - let sample_ratio = estimate_compression_ratio_with_sampling( - scheme, - self, - data.array(), - ctx.clone(), - )?; - - tracing::trace!( - target: TARGET_SELECT, - scheme = %scheme.id(), - kind = "Sample", - ratio = sample_ratio, - "scheme.evaluated.resolved", - ); - - if is_better_ratio(sample_ratio, &best) { - best = Some((scheme, sample_ratio)); - } + self.check_sample_scheme(data, &ctx, &mut best, scheme)?; } CompressionEstimate::Deferred(DeferredEstimate::Callback(estimate_callback)) => { - let verdict = estimate_callback(self, data, ctx.clone())?; - let resolved_kind = match verdict { - EstimateVerdict::Skip => "Skip", - EstimateVerdict::AlwaysUse => "AlwaysUse", - EstimateVerdict::Ratio(_) => "Ratio", - }; - if let EstimateVerdict::Ratio(ratio) = verdict { - tracing::trace!( - target: TARGET_SELECT, - scheme = %scheme.id(), - kind = "Estimate", - resolved_kind, - ratio, - "scheme.evaluated.resolved", - ); - } else { - tracing::trace!( - target: TARGET_SELECT, - scheme = %scheme.id(), - kind = "Estimate", - resolved_kind, - "scheme.evaluated.resolved", - ); - } - if let Some(winner_estimate) = - Self::check_and_update_estimate_verdict(&mut best, scheme, verdict) - { + if let Some(winner_estimate) = self.check_estimate_callback( + data, + &ctx, + &mut best, + scheme, + estimate_callback, + )? { return Ok(Some((scheme, winner_estimate))); } } @@ -627,6 +592,71 @@ impl CascadingCompressor { Ok(best.map(|(scheme, ratio)| (scheme, WinnerEstimate::Ratio(ratio)))) } + /// Helper function for sampling a scheme to get an estimated compression ratio. + fn check_sample_scheme( + &self, + data: &mut ArrayAndStats, + ctx: &CompressorContext, + best: &mut Option<(&'static dyn Scheme, f64)>, + scheme: &'static dyn Scheme, + ) -> VortexResult<()> { + let sample_ratio = + estimate_compression_ratio_with_sampling(scheme, self, data.array(), ctx.clone())?; + + tracing::trace!( + target: TARGET_SELECT, + scheme = %scheme.id(), + kind = "Sample", + ratio = sample_ratio, + "scheme.evaluated.resolved", + ); + + if is_better_ratio(sample_ratio, &*best) { + *best = Some((scheme, sample_ratio)); + } + + Ok(()) + } + + /// Helper function for running a custom compression ratio estimation callback for a scheme. + fn check_estimate_callback( + &self, + data: &mut ArrayAndStats, + ctx: &CompressorContext, + best: &mut Option<(&'static dyn Scheme, f64)>, + scheme: &'static dyn Scheme, + estimate_callback: Box, + ) -> VortexResult> { + let verdict = estimate_callback(self, data, ctx.clone())?; + let resolved_kind = match verdict { + EstimateVerdict::Skip => "Skip", + EstimateVerdict::AlwaysUse => "AlwaysUse", + EstimateVerdict::Ratio(_) => "Ratio", + }; + if let EstimateVerdict::Ratio(ratio) = verdict { + tracing::trace!( + target: TARGET_SELECT, + scheme = %scheme.id(), + kind = "Estimate", + resolved_kind, + ratio, + "scheme.evaluated.resolved", + ); + } else { + tracing::trace!( + target: TARGET_SELECT, + scheme = %scheme.id(), + kind = "Estimate", + resolved_kind, + "scheme.evaluated.resolved", + ); + } + + Ok(Self::check_and_update_estimate_verdict( + best, scheme, verdict, + )) + } + /// Updates `best` from a terminal estimate verdict. fn check_and_update_estimate_verdict( best: &mut Option<(&'static dyn Scheme, f64)>, diff --git a/vortex-compressor/src/lib.rs b/vortex-compressor/src/lib.rs index e25620a2836..142a47ee924 100644 --- a/vortex-compressor/src/lib.rs +++ b/vortex-compressor/src/lib.rs @@ -82,16 +82,33 @@ //! //! | Event | Target | Level | Fields | //! |-----------------------------|-------------------|-------|-------------------------------------------------------------------------------------------------| -//! | `scheme.evaluated` | select | trace | `scheme`, `kind`, `ratio` (Option) | +//! | `scheme.evaluated` | select | trace | `scheme`, `kind`, `ratio` (only when `kind = "Ratio"`) | //! | `scheme.evaluated.resolved` | select | trace | `scheme`, `kind`, `resolved_kind`?, `ratio`? | -//! | `scheme.winner` | select | debug | `scheme`, `estimated_ratio`, `candidate_count` | -//! | `scheme.compress_result` | encode | debug | `scheme`, `before_nbytes`, `after_nbytes`, `estimated_ratio`, `actual_ratio`, `accepted` | +//! | `scheme.winner` | select | debug | `scheme`, `candidate_count`, and either `estimated_ratio` or `always_use = true` | +//! | `scheme.compress_result` | encode | debug | `scheme`, `before_nbytes`, `after_nbytes`, `actual_ratio`, `accepted`, and either `estimated_ratio` or `always_use = true` | //! | `sample.collected` | estimate | trace | `scheme`, `sample_count`, `sample_size`, `sampled_len`, `source_len` | //! | `sample.result` | estimate | debug | `scheme`, `sampled_before`, `sampled_after`, `sampled_ratio` | -//! | `short_circuit` | select / cascade | debug | `reason` (`cascade_exhausted` \| `no_schemes` \| `empty` \| `all_null` \| `fell_through` \| `larger_output`), scheme?/parent? | -//! -//! An `estimated_ratio` of [`f64::INFINITY`] indicates a scheme that returned -//! [`CompressionEstimate::AlwaysUse`](estimate::CompressionEstimate::AlwaysUse). +//! | `short_circuit` | select / cascade | debug | `reason` plus reason-specific fields (see below) | +//! +//! ### `short_circuit` reasons and fields +//! +//! The `short_circuit` event reports six distinct reasons, each carrying a different set of +//! fields. Downstream tooling should branch on `reason` before reading the other fields. +//! +//! | `reason` | Target | Additional fields | +//! |---------------------|-----------|---------------------------------------------------------------------------------------| +//! | `cascade_exhausted` | cascade | `parent`, `child_index` | +//! | `no_schemes` | select | — (no additional fields) | +//! | `empty` | select | — (no additional fields) | +//! | `all_null` | select | — (no additional fields) | +//! | `fell_through` | select | `candidate_count` | +//! | `larger_output` | select | `scheme`, `before_nbytes`, `after_nbytes`, `actual_ratio`, plus either `estimated_ratio` or `always_use = true` | +//! +//! The `always_use` boolean (emitted on `scheme.winner`, `scheme.compress_result`, and the +//! `larger_output` short-circuit) indicates the winner was a scheme that returned +//! [`EstimateVerdict::AlwaysUse`](estimate::EstimateVerdict::AlwaysUse) and therefore +//! did not produce a numeric estimate. It is emitted in place of `estimated_ratio` so that +//! JSON subscribers never see a non-finite number in the `estimated_ratio` slot. //! //! Field names are considered stable and are meant to be matched directly by downstream //! observability tooling. This means `tracing-opentelemetry`, `tracing-perfetto`, and diff --git a/vortex-compressor/tests/tracing.rs b/vortex-compressor/tests/tracing.rs index ee479be1b15..290d50d37de 100644 --- a/vortex-compressor/tests/tracing.rs +++ b/vortex-compressor/tests/tracing.rs @@ -32,12 +32,21 @@ use tracing_subscriber::Layer; use tracing_subscriber::layer::Context; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::Registry; +use vortex_array::ArrayRef; +use vortex_array::Canonical; use vortex_array::IntoArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity; use vortex_buffer::Buffer; use vortex_compressor::CascadingCompressor; use vortex_compressor::builtins::IntDictScheme; +use vortex_compressor::ctx::CompressorContext; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::EstimateVerdict; +use vortex_compressor::scheme::Scheme; +use vortex_compressor::scheme::SchemeExt; +use vortex_compressor::stats::ArrayAndStats; +use vortex_error::VortexResult; /// A captured tracing event: its target, name, and structured fields. #[derive(Debug, Clone)] @@ -256,3 +265,363 @@ fn sampling_emits_sample_collected_and_result() { .expect("sampled_ratio parses as f64"); assert!(ratio.is_finite() && ratio > 0.0, "ratio = {ratio}"); } + +// --------------------------------------------------------------------------- +// Custom test schemes +// +// The tests below drive short-circuit paths that cannot be triggered with the +// standard built-in schemes on their own. Each scheme is a zero-sized struct +// so it can be placed behind a `&'static dyn Scheme` via rvalue static +// promotion, matching how the built-in schemes are registered. +// --------------------------------------------------------------------------- + +/// Matches integer primitive arrays but always returns +/// [`CompressionEstimate::Verdict(EstimateVerdict::Skip)`], so no scheme ever wins. Drives the +/// `fell_through` short-circuit. +#[derive(Debug)] +struct AlwaysSkipScheme; + +impl Scheme for AlwaysSkipScheme { + fn scheme_name(&self) -> &'static str { + "test.always_skip" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches!(canonical, Canonical::Primitive(p) if p.ptype().is_int()) + } + + fn expected_compression_ratio( + &self, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> CompressionEstimate { + CompressionEstimate::Verdict(EstimateVerdict::Skip) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> VortexResult { + unreachable!("`AlwaysSkipScheme` never becomes the winner") + } +} + +/// Reports a very high estimated ratio but then produces an output strictly +/// larger than the canonical input by doubling the data. Drives the +/// `larger_output` short-circuit and the `scheme.compress_result` event with +/// `accepted = false`. +#[derive(Debug)] +struct OverestimatingScheme; + +impl Scheme for OverestimatingScheme { + fn scheme_name(&self) -> &'static str { + "test.overestimating" + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches!(canonical, Canonical::Primitive(p) if p.ptype().is_int()) + } + + fn expected_compression_ratio( + &self, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> CompressionEstimate { + CompressionEstimate::Verdict(EstimateVerdict::Ratio(100.0)) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> VortexResult { + // Double the input values so `nbytes` strictly exceeds the canonical encoding. + // The tests only feed `i32` arrays through this scheme, so the typed slice is + // safe. + let slice: Vec = data.array_as_primitive().as_slice::().to_vec(); + let mut doubled = Vec::with_capacity(slice.len() * 2); + doubled.extend_from_slice(&slice); + doubled.extend_from_slice(&slice); + Ok(PrimitiveArray::new(Buffer::from_iter(doubled), Validity::NonNullable).into_array()) + } +} + +/// Macro that declares a recursive test scheme: one that always wins on integer +/// primitive arrays (via a high `Ratio(_)` estimate) and whose `compress` +/// implementation recursively calls [`CascadingCompressor::compress_child`] on +/// the same array. Four distinct instances are declared below so that the +/// self-exclusion rule enforces a new scheme at each cascade level, letting +/// the test drive `MAX_CASCADE = 3` descents before the fourth call hits the +/// `cascade_exhausted` short-circuit. +macro_rules! declare_recursive_scheme { + ($name:ident, $scheme_name:literal, $ratio:expr) => { + #[derive(Debug)] + struct $name; + + impl Scheme for $name { + fn scheme_name(&self) -> &'static str { + $scheme_name + } + + fn matches(&self, canonical: &Canonical) -> bool { + matches!(canonical, Canonical::Primitive(p) if p.ptype().is_int()) + } + + fn expected_compression_ratio( + &self, + _data: &mut ArrayAndStats, + _ctx: CompressorContext, + ) -> CompressionEstimate { + CompressionEstimate::Verdict(EstimateVerdict::Ratio($ratio)) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &mut ArrayAndStats, + ctx: CompressorContext, + ) -> VortexResult { + compressor.compress_child(data.array(), &ctx, self.id(), 0) + } + } + }; +} + +declare_recursive_scheme!(RecursiveSchemeA, "test.recursive.a", 10.0); +declare_recursive_scheme!(RecursiveSchemeB, "test.recursive.b", 9.0); +declare_recursive_scheme!(RecursiveSchemeC, "test.recursive.c", 8.0); +declare_recursive_scheme!(RecursiveSchemeD, "test.recursive.d", 7.0); + +// --------------------------------------------------------------------------- +// Short-circuit event tests +// --------------------------------------------------------------------------- + +/// An all-null nullable primitive array must take the `all_null` short-circuit +/// path before any scheme is evaluated. +#[test] +fn short_circuit_all_null_array_emits_reason_all_null() { + let (events, _guard) = install_capture_layer(); + + let array = PrimitiveArray::new( + Buffer::from_iter(std::iter::repeat_n(0_i32, 50)), + Validity::AllInvalid, + ) + .into_array(); + + let compressor = CascadingCompressor::new(vec![&IntDictScheme]); + let _compressed = compressor.compress(&array).unwrap(); + + let short_circuits: Vec<_> = events + .lock() + .iter() + .filter(|e| e.name == "short_circuit") + .cloned() + .collect(); + + assert!( + short_circuits + .iter() + .any(|e| e.field("reason") == Some("all_null")), + "expected a short_circuit event with reason=all_null, got: {short_circuits:#?}", + ); +} + +/// When every eligible scheme returns +/// [`CompressionEstimate::Verdict(EstimateVerdict::Skip)`], the +/// compressor must emit a `fell_through` short-circuit carrying the +/// `candidate_count`. +#[test] +fn short_circuit_fell_through_when_all_schemes_skip() { + let (events, _guard) = install_capture_layer(); + + let values: Vec = (0..50).collect(); + let array = PrimitiveArray::new(Buffer::from_iter(values), Validity::NonNullable).into_array(); + + let compressor = CascadingCompressor::new(vec![&AlwaysSkipScheme]); + let _compressed = compressor.compress(&array).unwrap(); + + let events = events.lock(); + let sc = events + .iter() + .find(|e| e.name == "short_circuit" && e.field("reason") == Some("fell_through")) + .expect("expected a short_circuit event with reason=fell_through"); + assert_eq!(sc.target, "vortex_compressor::select"); + assert_eq!(sc.field("candidate_count"), Some("1")); +} + +/// When the winning scheme produces an output larger than the canonical input, +/// the compressor must emit a `scheme.compress_result` with `accepted = false` +/// followed by a `larger_output` short-circuit. The `larger_output` event must +/// carry `estimated_ratio` (not `always_use`) because the scheme returned a +/// numeric `Ratio`, and all byte-count and ratio fields must be present and +/// parseable. +#[test] +fn short_circuit_larger_output_when_scheme_grows_the_input() { + let (events, _guard) = install_capture_layer(); + + let values: Vec = (0..50).collect(); + let array = PrimitiveArray::new(Buffer::from_iter(values), Validity::NonNullable).into_array(); + + let compressor = CascadingCompressor::new(vec![&OverestimatingScheme]); + let _compressed = compressor.compress(&array).unwrap(); + + let events = events.lock(); + + let compress_result = events + .iter() + .find(|e| e.name == "scheme.compress_result") + .expect("expected a scheme.compress_result event"); + assert_eq!(compress_result.target, "vortex_compressor::encode"); + assert_eq!(compress_result.field("scheme"), Some("test.overestimating")); + assert_eq!(compress_result.field("accepted"), Some("false")); + + let larger_output = events + .iter() + .find(|e| e.name == "short_circuit" && e.field("reason") == Some("larger_output")) + .expect("expected a short_circuit event with reason=larger_output"); + assert_eq!(larger_output.target, "vortex_compressor::select"); + assert_eq!(larger_output.field("scheme"), Some("test.overestimating")); + assert!( + larger_output + .field("before_nbytes") + .unwrap() + .parse::() + .is_ok() + ); + assert!( + larger_output + .field("after_nbytes") + .unwrap() + .parse::() + .is_ok() + ); + assert!( + larger_output + .field("actual_ratio") + .unwrap() + .parse::() + .is_ok() + ); + assert!( + larger_output + .field("estimated_ratio") + .unwrap() + .parse::() + .is_ok() + ); + // Scheme returned `Ratio(_)`, not `AlwaysUse`, so `always_use` must be absent + // so that JSON subscribers see a proper `estimated_ratio` number. + assert!(larger_output.field("always_use").is_none()); + + // The byte counts must also satisfy `after_nbytes > before_nbytes`, which is + // the whole point of this short-circuit. + let before: u64 = larger_output + .field("before_nbytes") + .unwrap() + .parse() + .unwrap(); + let after: u64 = larger_output + .field("after_nbytes") + .unwrap() + .parse() + .unwrap(); + assert!( + after > before, + "larger_output should only fire when after_nbytes > before_nbytes: \ + before = {before}, after = {after}", + ); +} + +/// Four distinct schemes that each recursively call `compress_child` will +/// exhaust the cascade budget after three descents. The fourth call must hit +/// the `cascade_exhausted` short-circuit at depth 0 (`allowed_cascading = 0`). +#[test] +fn short_circuit_cascade_exhausted_after_max_depth_descents() { + let (events, _guard) = install_capture_layer(); + + let values: Vec = (0..50).collect(); + let array = PrimitiveArray::new(Buffer::from_iter(values), Validity::NonNullable).into_array(); + + let compressor = CascadingCompressor::new(vec![ + &RecursiveSchemeA, + &RecursiveSchemeB, + &RecursiveSchemeC, + &RecursiveSchemeD, + ]); + let _compressed = compressor.compress(&array).unwrap(); + + let events = events.lock(); + let exhausted: Vec<_> = events + .iter() + .filter(|e| e.name == "short_circuit" && e.field("reason") == Some("cascade_exhausted")) + .collect(); + assert!( + !exhausted.is_empty(), + "expected at least one short_circuit event with reason=cascade_exhausted, got: {:#?}", + events + .iter() + .filter(|e| e.name == "short_circuit") + .collect::>(), + ); + // The event is emitted on the cascade target (not select), and carries + // `parent` and `child_index` fields identifying the call site. + let first = exhausted[0]; + assert_eq!(first.target, "vortex_compressor::cascade"); + assert!(first.field("parent").is_some()); + assert!(first.field("child_index").is_some()); +} + +/// The `Sample` estimation path must emit a `scheme.evaluated` with +/// `kind = "Sample"` (no `ratio` field — the estimate is not yet known), +/// followed later by a `scheme.evaluated.resolved` carrying the computed +/// `ratio`. +#[test] +fn scheme_evaluated_resolved_emitted_after_sampling() { + use vortex_array::arrays::VarBinArray; + use vortex_compressor::builtins::StringDictScheme; + + let (events, _guard) = install_capture_layer(); + + let strs: Vec<&str> = (0..10_000) + .map(|i| ["red", "green", "blue"][i % 3]) + .collect(); + let array = VarBinArray::from(strs).into_array(); + + let compressor = CascadingCompressor::new(vec![&StringDictScheme]); + let _compressed = compressor.compress(&array).unwrap(); + + let events = events.lock(); + + let evaluated_idx = events + .iter() + .position(|e| e.name == "scheme.evaluated" && e.field("kind") == Some("Sample")) + .expect("expected a scheme.evaluated event with kind=Sample"); + let evaluated = &events[evaluated_idx]; + // The initial `scheme.evaluated` for a Sample variant must not carry a + // `ratio` field — the resolved value is reported by the follow-up event + // below. + assert!(evaluated.field("ratio").is_none()); + + let resolved_idx = events + .iter() + .position(|e| e.name == "scheme.evaluated.resolved") + .expect("expected a scheme.evaluated.resolved event"); + assert!( + evaluated_idx < resolved_idx, + "scheme.evaluated must precede scheme.evaluated.resolved", + ); + + let resolved = &events[resolved_idx]; + assert_eq!(resolved.target, "vortex_compressor::select"); + assert_eq!(resolved.field("scheme"), Some("vortex.string.dict")); + assert_eq!(resolved.field("kind"), Some("Sample")); + let ratio: f64 = resolved + .field("ratio") + .expect("resolved ratio") + .parse() + .expect("resolved ratio parses as f64"); + assert!(ratio.is_finite() && ratio > 0.0, "ratio = {ratio}"); +}