From 1f67ca0ae4e6d195d3ed7c925433e6db53f17535 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 29 May 2026 15:18:39 +0100 Subject: [PATCH 1/4] some stuff Signed-off-by: Adam Gutglick --- Cargo.lock | 2 + encodings/parquet-variant/Cargo.toml | 2 +- vortex-btrblocks/Cargo.toml | 11 +- vortex-btrblocks/src/builder.rs | 2 + vortex-btrblocks/src/lib.rs | 2 + vortex-btrblocks/src/schemes/binary.rs | 1 - vortex-btrblocks/src/schemes/mod.rs | 3 +- vortex-btrblocks/src/variant/mod.rs | 427 +++++++++++++++++++++++++ 8 files changed, 445 insertions(+), 5 deletions(-) create mode 100644 vortex-btrblocks/src/variant/mod.rs diff --git a/Cargo.lock b/Cargo.lock index ec89bf1161f..37ffdc7484f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9325,6 +9325,7 @@ dependencies = [ "codspeed-divan-compat", "itertools 0.14.0", "num-traits", + "parquet-variant-compute", "pco", "rand 0.10.1", "rstest", @@ -9342,6 +9343,7 @@ dependencies = [ "vortex-fsst", "vortex-mask", "vortex-onpair", + "vortex-parquet-variant", "vortex-pco", "vortex-runend", "vortex-sequence", diff --git a/encodings/parquet-variant/Cargo.toml b/encodings/parquet-variant/Cargo.toml index 980af8a75a1..cb5a8492fea 100644 --- a/encodings/parquet-variant/Cargo.toml +++ b/encodings/parquet-variant/Cargo.toml @@ -12,7 +12,7 @@ readme = { workspace = true } repository = { workspace = true } rust-version = { workspace = true } version = { workspace = true } -publish = false +publish = true [lints] workspace = true diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 1adb6508828..69722215faf 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -16,6 +16,7 @@ version = { workspace = true } [dependencies] itertools = { workspace = true } num-traits = { workspace = true } +parquet-variant-compute = { workspace = true, optional = true } pco = { workspace = true, optional = true } rand = { workspace = true } rustc-hash = { workspace = true } @@ -31,6 +32,7 @@ vortex-fastlanes = { workspace = true } vortex-fsst = { workspace = true } vortex-mask = { workspace = true } vortex-onpair = { workspace = true, optional = true } +vortex-parquet-variant = { workspace = true, optional = true } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } @@ -42,22 +44,29 @@ vortex-zstd = { workspace = true, optional = true } [dev-dependencies] divan = { workspace = true } +rand = { workspace = true } rstest = { workspace = true } test-with = { workspace = true } vortex-array = { workspace = true, features = ["_test-harness"] } vortex-session = { workspace = true } [features] -# This feature enabled unstable encodings for which we don't guarantee stability. + unstable_encodings = [ "dep:vortex-tensor", "dep:vortex-onpair", "vortex-zstd?/unstable_encodings", ] +parquet-variant = [ + "dep:vortex-parquet-variant", + "dep:parquet-variant-compute", + "zstd", +] pco = ["dep:pco", "dep:vortex-pco"] zstd = ["dep:vortex-zstd"] [lints] +# This feature enabled unstable encodings for which we don't guarantee stability. workspace = true [[bench]] diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 61c40341dbc..9feec82b4ad 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -70,6 +70,8 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ &decimal::DecimalScheme, // Temporal schemes. &temporal::TemporalScheme, + // Binary schemes + &binary::BinaryDictScheme, ]; /// Builder for creating configured [`BtrBlocksCompressor`] instances. diff --git a/vortex-btrblocks/src/lib.rs b/vortex-btrblocks/src/lib.rs index 39db05246a6..6001d77684f 100644 --- a/vortex-btrblocks/src/lib.rs +++ b/vortex-btrblocks/src/lib.rs @@ -58,6 +58,8 @@ mod builder; mod canonical_compressor; /// Compression scheme implementations. pub mod schemes; +#[cfg(feature = "parquet-variant")] +pub mod variant; // Re-export framework types from vortex-compressor for backwards compatibility. // Btrblocks-specific exports. diff --git a/vortex-btrblocks/src/schemes/binary.rs b/vortex-btrblocks/src/schemes/binary.rs index 2e8b28cd396..a39fa48dec6 100644 --- a/vortex-btrblocks/src/schemes/binary.rs +++ b/vortex-btrblocks/src/schemes/binary.rs @@ -3,6 +3,5 @@ //! Binary compression schemes. -// Re-export builtin schemes from vortex-compressor. pub use vortex_compressor::builtins::BinaryConstantScheme; pub use vortex_compressor::builtins::BinaryDictScheme; diff --git a/vortex-btrblocks/src/schemes/mod.rs b/vortex-btrblocks/src/schemes/mod.rs index 16123429e86..8b8629d3f0a 100644 --- a/vortex-btrblocks/src/schemes/mod.rs +++ b/vortex-btrblocks/src/schemes/mod.rs @@ -5,11 +5,10 @@ pub mod binary; pub mod bool; +pub mod decimal; pub mod float; pub mod integer; pub mod string; - -pub mod decimal; pub mod temporal; pub(crate) mod patches; diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs new file mode 100644 index 00000000000..2defc928e46 --- /dev/null +++ b/vortex-btrblocks/src/variant/mod.rs @@ -0,0 +1,427 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Compression scheme for JSON data into binary variant representation + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrow::ArrowSessionExt; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::dtype::extension::ExtId; +use vortex_array::dtype::extension::ExtVTable; +use vortex_array::extension::EmptyMetadata; +use vortex_array::scalar::ScalarValue; +use vortex_compressor::ctx::CompressorContext; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::scheme::Scheme; +use vortex_compressor::scheme::SchemeExt; +use vortex_compressor::stats::ArrayAndStats; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_parquet_variant::ParquetVariant; +use vortex_parquet_variant::ParquetVariantArrayExt; + +use crate::CascadingCompressor; + +/// Compression scheme that converts JSON string extension arrays to Parquet Variant arrays. +#[derive(Debug)] +pub struct JsonToVariantScheme; + +/// Child indices for recursively compressed Parquet Variant binary children. +mod parquet_variant_children { + /// The Parquet Variant metadata child. + pub const METADATA: usize = 0; + /// The raw Parquet Variant value child. + pub const VALUE: usize = 1; +} + +/// JSON logical type backed by UTF-8 string storage. +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +pub struct Json; + +impl ExtVTable for Json { + type Metadata = EmptyMetadata; + type NativeValue<'a> = &'a str; + + fn id(&self) -> ExtId { + ExtId::new("vortex.json") + } + + fn serialize_metadata(&self, _metadata: &Self::Metadata) -> VortexResult> { + Ok(Vec::new()) + } + + fn deserialize_metadata(&self, metadata: &[u8]) -> VortexResult { + vortex_ensure!(metadata.is_empty(), "JSON metadata must be empty"); + Ok(EmptyMetadata) + } + + fn validate_dtype(ext_dtype: &ExtDType) -> VortexResult<()> { + vortex_ensure!( + ext_dtype.storage_dtype().is_utf8(), + "JSON storage dtype must be utf8, got {}", + ext_dtype.storage_dtype() + ); + Ok(()) + } + + fn unpack_native<'a>( + _ext_dtype: &'a ExtDType, + storage_value: &'a ScalarValue, + ) -> VortexResult> { + let ScalarValue::Utf8(value) = storage_value else { + vortex_bail!("JSON storage scalar must be utf8, got {storage_value}"); + }; + Ok(value.as_str()) + } +} + +impl Scheme for JsonToVariantScheme { + fn scheme_name(&self) -> &'static str { + "json_to_variant" + } + + fn matches(&self, canonical: &Canonical) -> bool { + let Canonical::Extension(ext_array) = canonical else { + return false; + }; + + ext_array.ext_dtype().is::() + } + + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let array = data.array().clone().execute::(exec_ctx)?; + let storage = array.storage_array().clone(); + + if !storage.dtype().is_utf8() { + vortex_bail!("storage must be utf8"); + } + + let arrow_array = { + let session = exec_ctx.session().clone(); + let arrow = session.arrow(); + arrow.execute_arrow(storage, None, exec_ctx)? + }; + + let array = parquet_variant_compute::json_to_variant(&arrow_array)?; + + let parquet_variant = + ParquetVariant::from_arrow_variant(&array)?.downcast::(); + + let compressed_metadata = compressor.compress_child( + parquet_variant.metadata_array(), + &compress_ctx, + self.id(), + parquet_variant_children::METADATA, + exec_ctx, + )?; + let compressed_value = parquet_variant + .value_array() + .map(|value| { + compressor.compress_child( + value, + &compress_ctx, + self.id(), + parquet_variant_children::VALUE, + exec_ctx, + ) + }) + .transpose()?; + + ParquetVariant::try_new( + parquet_variant.validity()?, + compressed_metadata, + compressed_value, + parquet_variant.typed_value_array().cloned(), + ) + .map(IntoArray::into_array) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rand::RngExt; + use rand::SeedableRng; + use rand::rngs::StdRng; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::Extension; + use vortex_array::arrays::ExtensionArray; + use vortex_array::arrays::VarBinView; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrays::extension::ExtensionArrayExt; + use vortex_array::session::ArraySession; + use vortex_compressor::builtins::BinaryDictScheme; + use vortex_compressor::builtins::IntConstantScheme; + use vortex_compressor::builtins::StringConstantScheme; + use vortex_compressor::builtins::StringDictScheme; + use vortex_session::VortexSession; + use vortex_zstd::Zstd; + + use super::*; + use crate::schemes::integer::BitPackingScheme; + use crate::schemes::integer::FoRScheme; + use crate::schemes::integer::RunEndScheme; + use crate::schemes::integer::SequenceScheme; + use crate::schemes::integer::SparseScheme; + use crate::schemes::integer::ZigZagScheme; + use crate::schemes::string::FSSTScheme; + use crate::schemes::string::ZstdScheme; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn json_data() -> Vec { + let mut rng = StdRng::seed_from_u64(0); + const ACCOUNT_KEYS: &[&str] = &["account_id", "customer_id", "tenant_id", "buyer_id"]; + const REGION_KEYS: &[&str] = &["region", "market", "country"]; + const REGIONS: &[&str] = &["us-east", "us-west", "eu", "apac", "latam"]; + const STATUS_KEYS: &[&str] = &["status", "payment_state", "lifecycle_state"]; + const STATUSES: &[&str] = &["draft", "open", "paid", "void", "past_due"]; + const AMOUNT_KEYS: &[&str] = &["discount", "tax", "shipping", "credit"]; + const FLAG_KEYS: &[&str] = &["autopay", "fraud_review", "priority", "disputed"]; + const TAGS: &[&str] = &["renewal", "manual", "usage", "trial", "enterprise"]; + + (0..1024) + .map(|_| { + let mut fields = vec![ + format!( + r#""{}":"acct_{:04x}""#, + ACCOUNT_KEYS[rng.random_range(0..ACCOUNT_KEYS.len())], + rng.random::(), + ), + format!( + r#""invoice_total":{}.{:02}"#, + rng.random_range(10_u32..100_000), + rng.random_range(0_u32..100), + ), + format!(r#""line_items":{}"#, rng.random_range(1_u32..250)), + ]; + + if rng.random_bool(0.85) { + fields.push(format!( + r#""{}":"{}""#, + STATUS_KEYS[rng.random_range(0..STATUS_KEYS.len())], + STATUSES[rng.random_range(0..STATUSES.len())], + )); + } + if rng.random_bool(0.75) { + fields.push(format!( + r#""{}":"{}""#, + REGION_KEYS[rng.random_range(0..REGION_KEYS.len())], + REGIONS[rng.random_range(0..REGIONS.len())], + )); + } + if rng.random_bool(0.55) { + fields.push(format!( + r#""{}":{}.{:02}"#, + AMOUNT_KEYS[rng.random_range(0..AMOUNT_KEYS.len())], + rng.random_range(0_u32..2_500), + rng.random_range(0_u32..100), + )); + } + if rng.random_bool(0.40) { + fields.push(format!( + r#""{}":{}"#, + FLAG_KEYS[rng.random_range(0..FLAG_KEYS.len())], + rng.random_bool(0.5), + )); + } + if rng.random_bool(0.30) { + fields.push(format!( + r#""tags":["{}","{}"]"#, + TAGS[rng.random_range(0..TAGS.len())], + TAGS[rng.random_range(0..TAGS.len())], + )); + } + + format!("{{{}}}", fields.join(",")) + }) + .collect() + } + + fn json_array(values: &[String]) -> VortexResult { + let storage = + VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + Ok(ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array()) + } + + fn parquet_variant_child_compressor() -> CascadingCompressor { + CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) + } + + fn print_comparison_output( + array: &ArrayRef, + string_compressed: &ArrayRef, + compressed: &ArrayRef, + ) { + let compressed_ratio = array.nbytes() as f64 / compressed.nbytes() as f64; + let compressed_array_ratio = string_compressed.nbytes() as f64 / compressed.nbytes() as f64; + println!( + "Compression sizes: input={} bytes, compressed string={} bytes, compressed output={} bytes", + array.nbytes(), + string_compressed.nbytes(), + compressed.nbytes(), + ); + println!("Compressed output ratio: {compressed_ratio:.2}x\n"); + println!("Compressed string / compressed output ratio: {compressed_array_ratio:.2}x\n"); + println!("JSON input encoding tree:\n{}", array.tree_display()); + println!( + "String-compressed encoding tree:\n{}", + string_compressed.tree_display() + ); + println!( + "Compressed output encoding tree:\n{}", + compressed.tree_display() + ); + } + + #[test] + fn parquet_variant_compresses_repeated_json_keys() -> VortexResult<()> { + let array = json_array(&json_data())?; + + let string_compressor = + CascadingCompressor::new(vec![&StringDictScheme, &StringConstantScheme]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let variant_data = ArrayAndStats::new(array.clone(), Default::default()); + let variant_compressed = JsonToVariantScheme.compress( + &variant_compressor, + &variant_data, + CompressorContext::new(), + &mut exec_ctx, + )?; + + assert!( + variant_compressed.is::(), + "expected ParquetVariant output, got encoding {} with dtype {} and {} bytes", + variant_compressed.encoding_id(), + variant_compressed.dtype(), + variant_compressed.nbytes() + ); + assert!( + variant_compressed.nbytes() < string_compressed.nbytes(), + "Parquet Variant conversion should compress repeated JSON keys: \ + variant={} bytes, input={} bytes", + variant_compressed.nbytes(), + string_compressed.nbytes(), + ); + + print_comparison_output(&array, &string_compressed, &variant_compressed); + + Ok(()) + } + + #[test] + fn recursively_compresses_parquet_variant_binary_children() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let variant_data = ArrayAndStats::new(array.clone(), Default::default()); + let compressed = JsonToVariantScheme.compress( + &variant_compressor, + &variant_data, + CompressorContext::new(), + &mut exec_ctx, + )?; + let parquet_variant = compressed.clone().downcast::(); + + assert!( + !parquet_variant.metadata_array().is::(), + "expected Parquet Variant metadata child to be compressed, got {}", + parquet_variant.metadata_array().encoding_id(), + ); + assert!(parquet_variant.value_array().is_some()); + assert!(parquet_variant.typed_value_array().is_none()); + + Ok(()) + } + + #[test] + fn prefers_smaller_extension_storage_over_variant_scheme() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let string_compressor = CascadingCompressor::new(vec![ + &StringDictScheme, + &FSSTScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &FSSTScheme, + &ZstdScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + let extension = compressed.clone().downcast::(); + let storage = extension.storage_array(); + assert!( + storage.is::(), + "expected JSON extension storage fallback to use zstd, got {}", + storage.encoding_id(), + ); + + print_comparison_output(&array, &string_compressed, &compressed); + + Ok(()) + } +} From 213266b57cbf1819dc81ffc233e22304438623d8 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 29 May 2026 16:20:07 +0100 Subject: [PATCH 2/4] binary compression Signed-off-by: Adam Gutglick --- vortex-btrblocks/src/builder.rs | 16 +- vortex-btrblocks/src/schemes/binary.rs | 258 +++++++++++++++++++++++++ vortex-btrblocks/src/variant/mod.rs | 78 +++++--- 3 files changed, 325 insertions(+), 27 deletions(-) diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 9feec82b4ad..f003a014e4b 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -65,13 +65,12 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ // Binary schemes. //////////////////////////////////////////////////////////////////////////////////////////////// &binary::BinaryDictScheme, + &binary::BinaryFSSTScheme, &binary::BinaryConstantScheme, // Decimal schemes. &decimal::DecimalScheme, // Temporal schemes. &temporal::TemporalScheme, - // Binary schemes - &binary::BinaryDictScheme, ]; /// Builder for creating configured [`BtrBlocksCompressor`] instances. @@ -148,7 +147,9 @@ impl BtrBlocksCompressorBuilder { /// Panics if any of the compact schemes are already present. #[cfg(feature = "zstd")] pub fn with_compact(self) -> Self { - let builder = self.with_new_scheme(&string::ZstdScheme); + let builder = self + .with_new_scheme(&string::ZstdScheme) + .with_new_scheme(&binary::BinaryZstdScheme); #[cfg(feature = "pco")] let builder = builder @@ -193,15 +194,20 @@ impl BtrBlocksCompressorBuilder { string::StringDictScheme.id(), string::FSSTScheme.id(), binary::BinaryDictScheme.id(), + binary::BinaryFSSTScheme.id(), ]; #[cfg(feature = "unstable_encodings")] excluded.push(string::OnPairScheme.id()); let builder = self.exclude_schemes(excluded); #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] - let builder = builder.with_new_scheme(&string::ZstdBuffersScheme); + let builder = builder + .with_new_scheme(&string::ZstdBuffersScheme) + .with_new_scheme(&binary::BinaryZstdBuffersScheme); #[cfg(all(feature = "zstd", not(feature = "unstable_encodings")))] - let builder = builder.with_new_scheme(&string::ZstdScheme); + let builder = builder + .with_new_scheme(&string::ZstdScheme) + .with_new_scheme(&binary::BinaryZstdScheme); builder } diff --git a/vortex-btrblocks/src/schemes/binary.rs b/vortex-btrblocks/src/schemes/binary.rs index a39fa48dec6..7839e59138c 100644 --- a/vortex-btrblocks/src/schemes/binary.rs +++ b/vortex-btrblocks/src/schemes/binary.rs @@ -3,5 +3,263 @@ //! Binary compression schemes. +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::primitive::PrimitiveArrayExt; +use vortex_array::arrays::varbin::VarBinArrayExt; pub use vortex_compressor::builtins::BinaryConstantScheme; pub use vortex_compressor::builtins::BinaryDictScheme; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_error::VortexResult; +use vortex_fsst::FSST; +use vortex_fsst::FSSTArrayExt; +use vortex_fsst::fsst_compress; +use vortex_fsst::fsst_train_compressor; + +use crate::ArrayAndStats; +use crate::CascadingCompressor; +use crate::CompressorContext; +use crate::Scheme; +use crate::SchemeExt; + +/// FSST compression for binary values. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BinaryFSSTScheme; + +/// Zstd compression for binary values. +#[cfg(feature = "zstd")] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BinaryZstdScheme; + +/// Buffer-level Zstd compression for binary values. +#[cfg(all(feature = "zstd", feature = "unstable_encodings"))] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BinaryZstdBuffersScheme; + +impl Scheme for BinaryFSSTScheme { + fn scheme_name(&self) -> &'static str { + "vortex.binary.fsst" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_binary() + } + + /// Children: lengths=0, code_offsets=1. + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let binary = data.array_as_varbinview().into_owned(); + let compressor_fsst = fsst_train_compressor(&binary); + let fsst = fsst_compress( + &binary, + binary.len(), + binary.dtype(), + &compressor_fsst, + exec_ctx, + ); + + let uncompressed_lengths_primitive = fsst + .uncompressed_lengths() + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)?; + let compressed_original_lengths = compressor.compress_child( + &uncompressed_lengths_primitive.into_array(), + &compress_ctx, + self.id(), + 0, + exec_ctx, + )?; + + let codes_offsets_primitive = fsst + .codes() + .offsets() + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)?; + let compressed_codes_offsets = compressor.compress_child( + &codes_offsets_primitive.into_array(), + &compress_ctx, + self.id(), + 1, + exec_ctx, + )?; + let compressed_codes = VarBinArray::try_new( + compressed_codes_offsets, + fsst.codes().bytes().clone(), + fsst.codes().dtype().clone(), + fsst.codes().validity()?, + )?; + + let fsst = FSST::try_new( + fsst.dtype().clone(), + fsst.symbols().clone(), + fsst.symbol_lengths().clone(), + compressed_codes, + compressed_original_lengths, + exec_ctx, + )?; + + Ok(fsst.into_array()) + } +} + +#[cfg(feature = "zstd")] +impl Scheme for BinaryZstdScheme { + fn scheme_name(&self) -> &'static str { + "vortex.binary.zstd" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_binary() + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + data: &ArrayAndStats, + _compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let compacted = data.array_as_varbinview().into_owned().compact_buffers()?; + Ok( + vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)? + .into_array(), + ) + } +} + +#[cfg(all(feature = "zstd", feature = "unstable_encodings"))] +impl Scheme for BinaryZstdBuffersScheme { + fn scheme_name(&self) -> &'static str { + "vortex.binary.zstd_buffers" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_binary() + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + _compressor: &CascadingCompressor, + data: &ArrayAndStats, + _compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::assert_arrays_eq; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::session::ArraySession; + use vortex_error::VortexResult; + use vortex_fsst::FSST; + use vortex_session::VortexSession; + + use crate::BtrBlocksCompressor; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn binary_fsst_data() -> VarBinViewArray { + VarBinViewArray::from_iter( + (0..1024).map(|idx| { + Some(format!("variant-key-{idx:04}-invoice-total-line-items").into_bytes()) + }), + DType::Binary(Nullability::NonNullable), + ) + } + + #[test] + fn default_compressor_uses_fsst_for_binary_data() -> VortexResult<()> { + let array = binary_fsst_data().into_array(); + let compressed = + BtrBlocksCompressor::default().compress(&array, &mut SESSION.create_execution_ctx())?; + + assert!( + compressed.is::(), + "expected binary data to be FSST-compressed, got {}", + compressed.encoding_id(), + ); + assert!(compressed.nbytes() < array.nbytes()); + + let decompressed = + compressed.execute::(&mut SESSION.create_execution_ctx())?; + assert_arrays_eq!(array, decompressed); + + Ok(()) + } + + #[cfg(feature = "zstd")] + #[test] + fn compact_compressor_uses_zstd_for_binary_data() -> VortexResult<()> { + let array = binary_fsst_data().into_array(); + let compressed = crate::BtrBlocksCompressorBuilder::default() + .with_compact() + .build() + .compress(&array, &mut SESSION.create_execution_ctx())?; + + assert!( + compressed.is::(), + "expected compact binary data to be Zstd-compressed, got {}", + compressed.encoding_id(), + ); + assert!(compressed.nbytes() < array.nbytes()); + + let decompressed = + compressed.execute::(&mut SESSION.create_execution_ctx())?; + assert_arrays_eq!(array, decompressed); + + Ok(()) + } +} diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs index 2defc928e46..12a97ab425c 100644 --- a/vortex-btrblocks/src/variant/mod.rs +++ b/vortex-btrblocks/src/variant/mod.rs @@ -182,10 +182,12 @@ mod tests { use vortex_compressor::builtins::IntConstantScheme; use vortex_compressor::builtins::StringConstantScheme; use vortex_compressor::builtins::StringDictScheme; + use vortex_fsst::FSST; use vortex_session::VortexSession; use vortex_zstd::Zstd; use super::*; + use crate::schemes::binary::BinaryFSSTScheme; use crate::schemes::integer::BitPackingScheme; use crate::schemes::integer::FoRScheme; use crate::schemes::integer::RunEndScheme; @@ -277,6 +279,7 @@ mod tests { CascadingCompressor::new(vec![ &JsonToVariantScheme, &BinaryDictScheme, + &BinaryFSSTScheme, &IntConstantScheme, &FoRScheme, &SparseScheme, @@ -324,13 +327,7 @@ mod tests { let variant_compressor = parquet_variant_child_compressor(); let mut exec_ctx = SESSION.create_execution_ctx(); - let variant_data = ArrayAndStats::new(array.clone(), Default::default()); - let variant_compressed = JsonToVariantScheme.compress( - &variant_compressor, - &variant_data, - CompressorContext::new(), - &mut exec_ctx, - )?; + let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?; assert!( variant_compressed.is::(), @@ -358,13 +355,7 @@ mod tests { let variant_compressor = parquet_variant_child_compressor(); let mut exec_ctx = SESSION.create_execution_ctx(); - let variant_data = ArrayAndStats::new(array.clone(), Default::default()); - let compressed = JsonToVariantScheme.compress( - &variant_compressor, - &variant_data, - CompressorContext::new(), - &mut exec_ctx, - )?; + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; let parquet_variant = compressed.clone().downcast::(); assert!( @@ -378,6 +369,48 @@ mod tests { Ok(()) } + #[test] + fn binary_fsst_improves_parquet_variant_child_compression() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + let mut exec_ctx = SESSION.create_execution_ctx(); + let without_binary_fsst = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) + .compress(&array, &mut exec_ctx)?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + let with_binary_fsst = + parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?; + let parquet_variant = with_binary_fsst.clone().downcast::(); + + assert!( + with_binary_fsst.nbytes() < without_binary_fsst.nbytes(), + "binary FSST should improve Parquet Variant child compression: with={} bytes, without={} bytes", + with_binary_fsst.nbytes(), + without_binary_fsst.nbytes(), + ); + assert!( + parquet_variant + .value_array() + .is_some_and(|value| value.is::()), + "expected Parquet Variant value child to use binary FSST, got {}", + parquet_variant.value_array().map_or_else( + || "missing".to_string(), + |value| value.encoding_id().to_string() + ), + ); + + Ok(()) + } + #[test] fn prefers_smaller_extension_storage_over_variant_scheme() -> VortexResult<()> { let array: ArrayRef = json_array(&json_data())?; @@ -400,7 +433,8 @@ mod tests { &JsonToVariantScheme, &BinaryDictScheme, &FSSTScheme, - &ZstdScheme, + &BinaryFSSTScheme, + // &ZstdScheme, &IntConstantScheme, &StringConstantScheme, &FoRScheme, @@ -412,13 +446,13 @@ mod tests { ]); let mut exec_ctx = SESSION.create_execution_ctx(); let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - let extension = compressed.clone().downcast::(); - let storage = extension.storage_array(); - assert!( - storage.is::(), - "expected JSON extension storage fallback to use zstd, got {}", - storage.encoding_id(), - ); + // let extension = compressed.clone().downcast::(); + // let storage = extension.storage_array(); + // assert!( + // storage.is::(), + // "expected JSON extension storage fallback to use zstd, got {}", + // storage.encoding_id(), + // ); print_comparison_output(&array, &string_compressed, &compressed); From 1bf832137793edc0c3d55874c64fb01fdcf467c1 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 29 May 2026 17:38:13 +0100 Subject: [PATCH 3/4] stash work Signed-off-by: Adam Gutglick --- Cargo.lock | 2 + vortex-btrblocks/Cargo.toml | 6 + vortex-btrblocks/src/variant/mod.rs | 306 +++++++++++++++++++++++++++- 3 files changed, 311 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37ffdc7484f..25ee336b533 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9322,6 +9322,8 @@ dependencies = [ name = "vortex-btrblocks" version = "0.1.0" dependencies = [ + "arrow-array", + "arrow-schema", "codspeed-divan-compat", "itertools 0.14.0", "num-traits", diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 69722215faf..e1c2bc38794 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -14,6 +14,8 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] +arrow-array = { workspace = true, optional = true } +arrow-schema = { workspace = true, optional = true } itertools = { workspace = true } num-traits = { workspace = true } parquet-variant-compute = { workspace = true, optional = true } @@ -36,6 +38,7 @@ vortex-parquet-variant = { workspace = true, optional = true } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } +vortex-session = { workspace = true, optional = true } vortex-sparse = { workspace = true } vortex-tensor = { workspace = true, optional = true } vortex-utils = { workspace = true } @@ -58,8 +61,11 @@ unstable_encodings = [ "vortex-zstd?/unstable_encodings", ] parquet-variant = [ + "dep:arrow-array", + "dep:arrow-schema", "dep:vortex-parquet-variant", "dep:parquet-variant-compute", + "dep:vortex-session", "zstd", ] pco = ["dep:pco", "dep:vortex-pco"] diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs index 12a97ab425c..0b577c387e7 100644 --- a/vortex-btrblocks/src/variant/mod.rs +++ b/vortex-btrblocks/src/variant/mod.rs @@ -3,18 +3,41 @@ //! Compression scheme for JSON data into binary variant representation +use std::sync::Arc; + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::StructArray as ArrowStructArray; +use arrow_schema::DataType; +use arrow_schema::Field; +use vortex_array::Array; +use vortex_array::ArrayId; +use vortex_array::ArrayParts; use vortex_array::ArrayRef; +use vortex_array::ArrayView; use vortex_array::Canonical; +use vortex_array::EmptyArrayData; use vortex_array::ExecutionCtx; +use vortex_array::ExecutionResult; use vortex_array::IntoArray; use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::VariantArray; use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrays::variant::VariantArrayExt; use vortex_array::arrow::ArrowSessionExt; +use vortex_array::arrow::FromArrowArray; +use vortex_array::arrow::to_arrow_null_buffer; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; use vortex_array::dtype::extension::ExtDType; use vortex_array::dtype::extension::ExtId; use vortex_array::dtype::extension::ExtVTable; use vortex_array::extension::EmptyMetadata; use vortex_array::scalar::ScalarValue; +use vortex_array::serde::ArrayChildren; +use vortex_array::validity::Validity; +use vortex_array::vtable::NotSupported; +use vortex_array::vtable::VTable; +use vortex_array::vtable::ValidityVTable; use vortex_compressor::ctx::CompressorContext; use vortex_compressor::estimate::CompressionEstimate; use vortex_compressor::estimate::DeferredEstimate; @@ -24,8 +47,12 @@ use vortex_compressor::stats::ArrayAndStats; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; use vortex_parquet_variant::ParquetVariant; use vortex_parquet_variant::ParquetVariantArrayExt; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; use crate::CascadingCompressor; @@ -41,6 +68,12 @@ mod parquet_variant_children { pub const VALUE: usize = 1; } +mod variant_to_json_children { + pub const VARIANT: usize = 0; + pub const NUM_SLOTS: usize = 1; + pub const SLOT_NAMES: [&str; NUM_SLOTS] = ["variant"]; +} + /// JSON logical type backed by UTF-8 string storage. #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] pub struct Json; @@ -82,6 +115,227 @@ impl ExtVTable for Json { } } +/// Array that exposes a Variant array as JSON strings. +#[derive(Debug, Clone)] +pub struct VariantToJson; + +/// A [`VariantToJson`]-encoded array. +pub type VariantToJsonArray = Array; + +impl VariantToJson { + /// Creates a JSON wrapper around a Variant-typed array. + pub fn try_new(variant: ArrayRef) -> VortexResult { + vortex_ensure!( + variant.dtype().is_variant(), + "VariantToJson expects a Variant array, got {}", + variant.dtype() + ); + + let storage_dtype = DType::Utf8(variant.dtype().nullability()); + let dtype = + DType::Extension(ExtDType::::try_new(EmptyMetadata, storage_dtype)?.erased()); + let len = variant.len(); + + Array::try_from_parts( + ArrayParts::new(VariantToJson, dtype, len, EmptyArrayData) + .with_slots(vec![Some(variant)].into()), + ) + } +} + +impl VTable for VariantToJson { + type TypedArrayData = EmptyArrayData; + type OperationsVTable = NotSupported; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.variant_to_json"); + *ID + } + + fn validate( + &self, + _data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + vortex_ensure!( + slots.len() == variant_to_json_children::NUM_SLOTS, + "VariantToJsonArray expects {} slots, got {}", + variant_to_json_children::NUM_SLOTS, + slots.len() + ); + let variant = slots[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; + + let DType::Extension(ext_dtype) = dtype else { + vortex_bail!("VariantToJsonArray dtype must be a JSON extension, got {dtype}"); + }; + vortex_ensure!( + ext_dtype.is::(), + "VariantToJsonArray dtype must be a JSON extension, got {dtype}" + ); + vortex_ensure!( + variant.dtype() == &DType::Variant(dtype.nullability()), + "VariantToJsonArray child dtype {} does not match JSON dtype nullability {}", + variant.dtype(), + dtype + ); + vortex_ensure!( + variant.len() == len, + "VariantToJsonArray child length {} does not match outer length {}", + variant.len(), + len + ); + + Ok(()) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 0 + } + + fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + vortex_panic!("VariantToJsonArray buffer index {idx} out of bounds") + } + + fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option { + None + } + + fn serialize( + _array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + Ok(Some(Vec::new())) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + vortex_ensure!( + metadata.is_empty(), + "VariantToJsonArray metadata must be empty" + ); + vortex_ensure!( + buffers.is_empty(), + "VariantToJsonArray expects 0 buffers, got {}", + buffers.len() + ); + vortex_ensure!( + children.len() == variant_to_json_children::NUM_SLOTS, + "VariantToJsonArray expects {} children, got {}", + variant_to_json_children::NUM_SLOTS, + children.len() + ); + + let variant_dtype = DType::Variant(dtype.nullability()); + let variant = children.get(variant_to_json_children::VARIANT, &variant_dtype, len)?; + + Ok( + ArrayParts::new(self.clone(), dtype.clone(), len, EmptyArrayData) + .with_slots(vec![Some(variant)].into()), + ) + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + match variant_to_json_children::SLOT_NAMES.get(idx) { + Some(name) => (*name).to_string(), + None => vortex_panic!("VariantToJsonArray slot_name index {idx} out of bounds"), + } + } + + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { + let variant = array.as_ref().slots()[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; + let variant = variant.clone().execute::(ctx)?; + vortex_ensure!( + variant.shredded().is_none(), + "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" + ); + + let parquet_variant = variant + .core_storage() + .as_opt::() + .ok_or_else(|| { + vortex_err!( + "VariantToJsonArray requires Parquet Variant core storage, got {}", + variant.core_storage().encoding_id() + ) + })?; + let arrow_variant = parquet_variant_to_json_arrow(parquet_variant, ctx)?; + let arrow_json = parquet_variant_compute::variant_to_json(&arrow_variant)?; + let storage = ArrayRef::from_arrow(&arrow_json, array.dtype().is_nullable())?; + + Ok(ExecutionResult::done( + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array(), + )) + } +} + +impl ValidityVTable for VariantToJson { + fn validity(array: ArrayView<'_, VariantToJson>) -> VortexResult { + array.slots()[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))? + .validity() + } +} + +fn parquet_variant_to_json_arrow( + parquet_variant: ArrayView<'_, ParquetVariant>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + vortex_ensure!( + parquet_variant.typed_value_array().is_none(), + "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" + ); + let value = parquet_variant + .value_array() + .ok_or_else(|| vortex_err!("VariantToJsonArray requires Parquet Variant value storage"))?; + + let metadata_arrow = { + let target = Field::new("", DataType::Binary, false); + let session = ctx.session().clone(); + session.arrow().execute_arrow( + parquet_variant.metadata_array().clone(), + Some(&target), + ctx, + )? + }; + let value_arrow = { + let target = Field::new("", DataType::Binary, value.dtype().is_nullable()); + let session = ctx.session().clone(); + session + .arrow() + .execute_arrow(value.clone(), Some(&target), ctx)? + }; + let fields = vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new( + "value", + DataType::Binary, + value.dtype().is_nullable(), + )), + ]; + let nulls = to_arrow_null_buffer(parquet_variant.validity()?, parquet_variant.len(), ctx)?; + + Ok(Arc::new(ArrowStructArray::try_new( + fields.into(), + vec![metadata_arrow, value_arrow], + nulls, + )?)) +} + impl Scheme for JsonToVariantScheme { fn scheme_name(&self) -> &'static str { "json_to_variant" @@ -172,6 +426,7 @@ mod tests { use rand::rngs::StdRng; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; + use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::Extension; use vortex_array::arrays::ExtensionArray; use vortex_array::arrays::VarBinView; @@ -184,7 +439,6 @@ mod tests { use vortex_compressor::builtins::StringDictScheme; use vortex_fsst::FSST; use vortex_session::VortexSession; - use vortex_zstd::Zstd; use super::*; use crate::schemes::binary::BinaryFSSTScheme; @@ -195,7 +449,6 @@ mod tests { use crate::schemes::integer::SparseScheme; use crate::schemes::integer::ZigZagScheme; use crate::schemes::string::FSSTScheme; - use crate::schemes::string::ZstdScheme; static SESSION: LazyLock = LazyLock::new(|| VortexSession::empty().with::()); @@ -275,6 +528,53 @@ mod tests { Ok(ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array()) } + #[test] + fn variant_to_json_canonicalizes_to_json_extension() -> VortexResult<()> { + let values = vec![ + "0".to_string(), + r#"{"a":32}"#.to_string(), + r#""hello""#.to_string(), + "null".to_string(), + ]; + let source = json_array(&values)?; + let source_ext = source.as_::(); + let storage = source_ext.storage_array().clone(); + + let mut exec_ctx = SESSION.create_execution_ctx(); + let arrow_array = { + let session = exec_ctx.session().clone(); + session + .arrow() + .execute_arrow(storage, None, &mut exec_ctx)? + }; + let arrow_variant = parquet_variant_compute::json_to_variant(&arrow_array)?; + let variant = ParquetVariant::from_arrow_variant(&arrow_variant)?; + + let wrapped = VariantToJson::try_new(variant)?; + assert_eq!(wrapped.dtype(), source.dtype()); + + let json = wrapped + .into_array() + .execute::(&mut exec_ctx)?; + assert!(json.ext_dtype().is::()); + let json_storage = json + .storage_array() + .clone() + .execute::(&mut exec_ctx)?; + let actual = json_storage.with_iterator(|iter| { + iter.map(|value| value.map(<[u8]>::to_vec)) + .collect::>() + }); + let expected = values + .iter() + .map(|value| Some(value.as_bytes().to_vec())) + .collect::>(); + + assert_eq!(actual, expected); + + Ok(()) + } + fn parquet_variant_child_compressor() -> CascadingCompressor { CascadingCompressor::new(vec![ &JsonToVariantScheme, @@ -356,7 +656,7 @@ mod tests { let variant_compressor = parquet_variant_child_compressor(); let mut exec_ctx = SESSION.create_execution_ctx(); let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - let parquet_variant = compressed.clone().downcast::(); + let parquet_variant = compressed.downcast::(); assert!( !parquet_variant.metadata_array().is::(), From c06df54e588e0a539f3f7a32708103b5987c7abb Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 2 Jun 2026 14:19:33 +0100 Subject: [PATCH 4/4] more compressor pieces Signed-off-by: Adam Gutglick --- vortex-btrblocks/src/variant/mod.rs | 78 +++++++++++++++-------------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs index 0b577c387e7..4c76ec843c6 100644 --- a/vortex-btrblocks/src/variant/mod.rs +++ b/vortex-btrblocks/src/variant/mod.rs @@ -407,13 +407,15 @@ impl Scheme for JsonToVariantScheme { }) .transpose()?; - ParquetVariant::try_new( + let variant = ParquetVariant::try_new( parquet_variant.validity()?, compressed_metadata, compressed_value, parquet_variant.typed_value_array().cloned(), - ) - .map(IntoArray::into_array) + )? + .into_array(); + + Ok(VariantToJson::try_new(variant)?.into_array()) } } @@ -427,9 +429,7 @@ mod tests { use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; use vortex_array::accessor::ArrayAccessor; - use vortex_array::arrays::Extension; use vortex_array::arrays::ExtensionArray; - use vortex_array::arrays::VarBinView; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::extension::ExtensionArrayExt; use vortex_array::session::ArraySession; @@ -437,7 +437,6 @@ mod tests { use vortex_compressor::builtins::IntConstantScheme; use vortex_compressor::builtins::StringConstantScheme; use vortex_compressor::builtins::StringDictScheme; - use vortex_fsst::FSST; use vortex_session::VortexSession; use super::*; @@ -530,15 +529,16 @@ mod tests { #[test] fn variant_to_json_canonicalizes_to_json_extension() -> VortexResult<()> { - let values = vec![ + let values = [ "0".to_string(), r#"{"a":32}"#.to_string(), r#""hello""#.to_string(), "null".to_string(), ]; - let source = json_array(&values)?; - let source_ext = source.as_::(); - let storage = source_ext.storage_array().clone(); + let storage = + VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + let source = + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage.clone())?.into_array(); let mut exec_ctx = SESSION.create_execution_ctx(); let arrow_array = { @@ -556,7 +556,8 @@ mod tests { let json = wrapped .into_array() .execute::(&mut exec_ctx)?; - assert!(json.ext_dtype().is::()); + assert_eq!(json.dtype(), source.dtype()); + assert!(json.storage_array().dtype().is_utf8()); let json_storage = json .storage_array() .clone() @@ -590,6 +591,23 @@ mod tests { ]) } + #[test] + fn json_to_variant_scheme_wraps_output_as_json() -> VortexResult<()> { + let array = json_array(&json_data())?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert_eq!(compressed.dtype(), array.dtype()); + + let json = compressed.execute::(&mut exec_ctx)?; + assert_eq!(json.dtype(), array.dtype()); + assert!(json.storage_array().dtype().is_utf8()); + + Ok(()) + } + fn print_comparison_output( array: &ArrayRef, string_compressed: &ArrayRef, @@ -629,13 +647,6 @@ mod tests { let mut exec_ctx = SESSION.create_execution_ctx(); let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - assert!( - variant_compressed.is::(), - "expected ParquetVariant output, got encoding {} with dtype {} and {} bytes", - variant_compressed.encoding_id(), - variant_compressed.dtype(), - variant_compressed.nbytes() - ); assert!( variant_compressed.nbytes() < string_compressed.nbytes(), "Parquet Variant conversion should compress repeated JSON keys: \ @@ -653,19 +664,21 @@ mod tests { fn recursively_compresses_parquet_variant_binary_children() -> VortexResult<()> { let array: ArrayRef = json_array(&json_data())?; + let mut exec_ctx = SESSION.create_execution_ctx(); + let uncompressed_children = + CascadingCompressor::new(vec![&JsonToVariantScheme]).compress(&array, &mut exec_ctx)?; + let variant_compressor = parquet_variant_child_compressor(); let mut exec_ctx = SESSION.create_execution_ctx(); let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - let parquet_variant = compressed.downcast::(); assert!( - !parquet_variant.metadata_array().is::(), - "expected Parquet Variant metadata child to be compressed, got {}", - parquet_variant.metadata_array().encoding_id(), + compressed.nbytes() < uncompressed_children.nbytes(), + "recursive child compression should reduce Parquet Variant size: compressed={} bytes, uncompressed_children={} bytes", + compressed.nbytes(), + uncompressed_children.nbytes(), ); - assert!(parquet_variant.value_array().is_some()); - assert!(parquet_variant.typed_value_array().is_none()); - + assert_eq!(compressed.dtype(), array.dtype()); Ok(()) } @@ -689,7 +702,6 @@ mod tests { let mut exec_ctx = SESSION.create_execution_ctx(); let with_binary_fsst = parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?; - let parquet_variant = with_binary_fsst.clone().downcast::(); assert!( with_binary_fsst.nbytes() < without_binary_fsst.nbytes(), @@ -697,16 +709,6 @@ mod tests { with_binary_fsst.nbytes(), without_binary_fsst.nbytes(), ); - assert!( - parquet_variant - .value_array() - .is_some_and(|value| value.is::()), - "expected Parquet Variant value child to use binary FSST, got {}", - parquet_variant.value_array().map_or_else( - || "missing".to_string(), - |value| value.encoding_id().to_string() - ), - ); Ok(()) } @@ -732,9 +734,9 @@ mod tests { let variant_compressor = CascadingCompressor::new(vec![ &JsonToVariantScheme, &BinaryDictScheme, - &FSSTScheme, + // &FSSTScheme, &BinaryFSSTScheme, - // &ZstdScheme, + // &crate::schemes::binary::BinaryZstdScheme, &IntConstantScheme, &StringConstantScheme, &FoRScheme,