diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d1ebbbbe746..42bad9d9bfa35 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -709,134 +709,50 @@ config_namespace! { } } -/// Options for content-defined chunking (CDC) when writing parquet files. -/// See [`ParquetOptions::use_content_defined_chunking`]. -/// -/// Can be enabled with default options by setting -/// `use_content_defined_chunking` to `true`, or configured with sub-fields -/// like `use_content_defined_chunking.min_chunk_size`. -#[derive(Debug, Clone, PartialEq)] -pub struct CdcOptions { - /// Minimum chunk size in bytes. The rolling hash will not trigger a split - /// until this many bytes have been accumulated. Default is 256 KiB. - pub min_chunk_size: usize, +config_namespace! { + /// Options for content-defined chunking (CDC) when writing parquet files. + /// Mirrors `parquet::file::properties::CdcOptions`. + /// + /// Carried as a [`CdcOptions`] in [`ParquetOptions::content_defined_chunking`] + /// with an explicit `enabled` flag, so it can be toggled with dotted config + /// keys (`content_defined_chunking.enabled = true|false`) and the result is + /// independent of the order in which the keys are set. + pub struct CdcOptions { + /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing + /// parquet files. When enabled, parallel writing is automatically disabled + /// since the chunker state must persist across row groups. + pub enabled: bool, default = false - /// Maximum chunk size in bytes. A split is forced when the accumulated - /// size exceeds this value. Default is 1 MiB. - pub max_chunk_size: usize, + /// Minimum chunk size in bytes. The rolling hash will not trigger a split + /// until this many bytes have been accumulated. Default is 256 KiB. + pub min_chunk_size: usize, default = 256 * 1024 - /// Normalization level. Increasing this improves deduplication ratio - /// but increases fragmentation. Recommended range is [-3, 3], default is 0. - pub norm_level: i32, -} + /// Maximum chunk size in bytes. A split is forced when the accumulated + /// size exceeds this value. Default is 1 MiB. + pub max_chunk_size: usize, default = 1024 * 1024 -// Note: `CdcOptions` intentionally does NOT implement `Default` so that the -// blanket `impl ConfigField for Option` does not -// apply. This allows the specific `impl ConfigField for Option` -// below to handle "true"/"false" for enabling/disabling CDC. -// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`. -impl CdcOptions { - /// Returns a new `CdcOptions` with default values. - #[expect(clippy::should_implement_trait)] - pub fn default() -> Self { - Self { - min_chunk_size: 256 * 1024, - max_chunk_size: 1024 * 1024, - norm_level: 0, - } + /// Normalization level. Increasing this improves deduplication ratio + /// but increases fragmentation. Recommended range is [-3, 3], default is 0. + pub norm_level: i32, default = 0 } } -impl ConfigField for CdcOptions { - fn set(&mut self, key: &str, value: &str) -> Result<()> { - let (key, rem) = key.split_once('.').unwrap_or((key, "")); - match key { - "min_chunk_size" => self.min_chunk_size.set(rem, value), - "max_chunk_size" => self.max_chunk_size.set(rem, value), - "norm_level" => self.norm_level.set(rem, value), - _ => _config_err!("Config value \"{}\" not found on CdcOptions", key), - } - } - - fn visit(&self, v: &mut V, key_prefix: &str, _description: &'static str) { - let key = format!("{key_prefix}.min_chunk_size"); - self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB."); - let key = format!("{key_prefix}.max_chunk_size"); - self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB."); - let key = format!("{key_prefix}.norm_level"); - self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0."); - } - - fn reset(&mut self, key: &str) -> Result<()> { - let (key, rem) = key.split_once('.').unwrap_or((key, "")); - match key { - "min_chunk_size" => { - if rem.is_empty() { - self.min_chunk_size = CdcOptions::default().min_chunk_size; - Ok(()) - } else { - self.min_chunk_size.reset(rem) - } - } - "max_chunk_size" => { - if rem.is_empty() { - self.max_chunk_size = CdcOptions::default().max_chunk_size; - Ok(()) - } else { - self.max_chunk_size.reset(rem) - } - } - "norm_level" => { - if rem.is_empty() { - self.norm_level = CdcOptions::default().norm_level; - Ok(()) - } else { - self.norm_level.reset(rem) - } - } - _ => _config_err!("Config value \"{}\" not found on CdcOptions", key), - } - } -} - -/// `ConfigField` for `Option` — allows setting the option to -/// `"true"` (enable with defaults) or `"false"` (disable), in addition to -/// setting individual sub-fields like `min_chunk_size`. -impl ConfigField for Option { - fn visit(&self, v: &mut V, key: &str, description: &'static str) { - match self { - Some(s) => s.visit(v, key, description), - None => v.none(key, description), - } - } - - fn set(&mut self, key: &str, value: &str) -> Result<()> { - if key.is_empty() { - match value.to_ascii_lowercase().as_str() { - "true" => { - *self = Some(CdcOptions::default()); - Ok(()) - } - "false" => { - *self = None; - Ok(()) - } - _ => _config_err!( - "Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'" - ), - } - } else { - self.get_or_insert_with(CdcOptions::default).set(key, value) +impl CdcOptions { + /// Returns enabled CDC options with the default chunking parameters. + /// + /// Shorthand for `CdcOptions { enabled: true, ..Default::default() }`; combine + /// with struct-update syntax to override parameters, e.g. + /// `CdcOptions { min_chunk_size: 4096, ..CdcOptions::enabled() }`. + pub fn enabled() -> Self { + Self { + enabled: true, + ..Default::default() } } - fn reset(&mut self, key: &str) -> Result<()> { - if key.is_empty() { - *self = None; - Ok(()) - } else { - self.get_or_insert_with(CdcOptions::default).reset(key) - } + /// Returns disabled CDC options (equivalent to [`CdcOptions::default`]). + pub fn disabled() -> Self { + Self::default() } } @@ -1036,11 +952,14 @@ config_namespace! { /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 - /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing - /// parquet files. When `Some`, CDC is enabled with the given options; when `None` - /// (the default), CDC is disabled. When CDC is enabled, parallel writing is - /// automatically disabled since the chunker state must persist across row groups. - pub use_content_defined_chunking: Option, default = None + /// (writing) EXPERIMENTAL: Content-defined chunking (CDC) options when writing + /// parquet files. Disabled by default; toggle with + /// `content_defined_chunking.enabled = true|false`. The chunking parameters live + /// under the same prefix (e.g. `content_defined_chunking.min_chunk_size`). When + /// enabled, parallel writing is automatically disabled since the chunker state + /// must persist across row groups. Mirrors + /// `parquet::file::properties::WriterProperties::content_defined_chunking`. + pub content_defined_chunking: CdcOptions, default = Default::default() } } @@ -4020,73 +3939,54 @@ mod tests { #[cfg(feature = "parquet")] #[test] - fn set_cdc_option_with_boolean_true() { + fn set_cdc_enabled_flag() { use crate::config::ConfigOptions; let mut config = ConfigOptions::default(); - assert!( - config - .execution - .parquet - .use_content_defined_chunking - .is_none() - ); + // CDC is disabled by default. + assert!(!config.execution.parquet.content_defined_chunking.enabled); - // Setting to "true" should enable CDC with default options + // `.enabled = true` enables CDC; parameters keep their defaults. config .set( - "datafusion.execution.parquet.use_content_defined_chunking", + "datafusion.execution.parquet.content_defined_chunking.enabled", "true", ) .unwrap(); - let cdc = config - .execution - .parquet - .use_content_defined_chunking - .as_ref() - .expect("CDC should be enabled"); + let cdc = &config.execution.parquet.content_defined_chunking; + assert!(cdc.enabled); assert_eq!(cdc.min_chunk_size, 256 * 1024); assert_eq!(cdc.max_chunk_size, 1024 * 1024); assert_eq!(cdc.norm_level, 0); - // Setting to "false" should disable CDC + // `.enabled = false` disables CDC. config .set( - "datafusion.execution.parquet.use_content_defined_chunking", + "datafusion.execution.parquet.content_defined_chunking.enabled", "false", ) .unwrap(); - assert!( - config - .execution - .parquet - .use_content_defined_chunking - .is_none() - ); + assert!(!config.execution.parquet.content_defined_chunking.enabled); } #[cfg(feature = "parquet")] #[test] - fn set_cdc_option_with_subfields() { + fn set_cdc_param_does_not_enable() { use crate::config::ConfigOptions; let mut config = ConfigOptions::default(); - // Setting sub-fields should also enable CDC + // Setting a parameter does NOT enable CDC (`enabled` is a distinct field, + // defaulting to false), and the result is independent of key order. config .set( - "datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size", + "datafusion.execution.parquet.content_defined_chunking.min_chunk_size", "1024", ) .unwrap(); - let cdc = config - .execution - .parquet - .use_content_defined_chunking - .as_ref() - .expect("CDC should be enabled"); + let cdc = &config.execution.parquet.content_defined_chunking; + assert!(!cdc.enabled); assert_eq!(cdc.min_chunk_size, 1024); - // Other fields should be defaults assert_eq!(cdc.max_chunk_size, 1024 * 1024); assert_eq!(cdc.norm_level, 0); } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 3f827fbfa75a0..9142e46a73c64 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -191,7 +191,7 @@ impl ParquetOptions { bloom_filter_on_write, bloom_filter_fpp, bloom_filter_ndv, - use_content_defined_chunking, + content_defined_chunking, // not in WriterProperties enable_page_index: _, @@ -249,7 +249,8 @@ impl ParquetOptions { if let Some(encoding) = encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } - if let Some(cdc) = use_content_defined_chunking { + if content_defined_chunking.enabled { + let cdc = content_defined_chunking; if cdc.min_chunk_size == 0 { return Err(DataFusionError::Configuration( "CDC min_chunk_size must be greater than 0".to_string(), @@ -485,7 +486,7 @@ mod tests { coerce_int96: None, coerce_int96_tz: None, max_predicate_cache_size: defaults.max_predicate_cache_size, - use_content_defined_chunking: defaults.use_content_defined_chunking.clone(), + content_defined_chunking: defaults.content_defined_chunking.clone(), } } @@ -603,13 +604,15 @@ mod tests { skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, coerce_int96_tz: None, - use_content_defined_chunking: props.content_defined_chunking().map(|c| { - CdcOptions { + content_defined_chunking: props + .content_defined_chunking() + .map(|c| CdcOptions { + enabled: true, min_chunk_size: c.min_chunk_size, max_chunk_size: c.max_chunk_size, norm_level: c.norm_level, - } - }), + }) + .unwrap_or_default(), }, column_specific_options, key_value_metadata, @@ -823,11 +826,12 @@ mod tests { #[test] fn test_cdc_enabled_with_custom_options() { let mut opts = TableParquetOptions::default(); - opts.global.use_content_defined_chunking = Some(CdcOptions { + opts.global.content_defined_chunking = CdcOptions { + enabled: true, min_chunk_size: 128 * 1024, max_chunk_size: 512 * 1024, norm_level: 2, - }); + }; opts.arrow_schema(&Arc::new(Schema::empty())); let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); @@ -846,20 +850,38 @@ mod tests { assert!(props.content_defined_chunking().is_none()); } + #[test] + fn test_cdc_params_ignored_when_disabled() { + // Parameters are customized but `enabled` is false, so CDC stays off. + let mut opts = TableParquetOptions::default(); + opts.global.content_defined_chunking = CdcOptions { + enabled: false, + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }; + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + assert!(props.content_defined_chunking().is_none()); + } + #[test] fn test_cdc_round_trip_through_writer_props() { let mut opts = TableParquetOptions::default(); - opts.global.use_content_defined_chunking = Some(CdcOptions { + opts.global.content_defined_chunking = CdcOptions { + enabled: true, min_chunk_size: 64 * 1024, max_chunk_size: 2 * 1024 * 1024, norm_level: -1, - }); + }; opts.arrow_schema(&Arc::new(Schema::empty())); let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); let recovered = session_config_from_writer_props(&props); - let cdc = recovered.global.use_content_defined_chunking.unwrap(); + let cdc = recovered.global.content_defined_chunking; + assert!(cdc.enabled); assert_eq!(cdc.min_chunk_size, 64 * 1024); assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); assert_eq!(cdc.norm_level, -1); @@ -868,10 +890,11 @@ mod tests { #[test] fn test_cdc_validation_zero_min_chunk_size() { let mut opts = TableParquetOptions::default(); - opts.global.use_content_defined_chunking = Some(CdcOptions { + opts.global.content_defined_chunking = CdcOptions { + enabled: true, min_chunk_size: 0, ..CdcOptions::default() - }); + }; opts.arrow_schema(&Arc::new(Schema::empty())); assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); } @@ -879,11 +902,12 @@ mod tests { #[test] fn test_cdc_validation_max_not_greater_than_min() { let mut opts = TableParquetOptions::default(); - opts.global.use_content_defined_chunking = Some(CdcOptions { + opts.global.content_defined_chunking = CdcOptions { + enabled: true, min_chunk_size: 512 * 1024, max_chunk_size: 256 * 1024, ..CdcOptions::default() - }); + }; opts.arrow_schema(&Arc::new(Schema::empty())); assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); } diff --git a/datafusion/core/tests/parquet/content_defined_chunking.rs b/datafusion/core/tests/parquet/content_defined_chunking.rs index 6a98ded1bd4cf..4a8615a875262 100644 --- a/datafusion/core/tests/parquet/content_defined_chunking.rs +++ b/datafusion/core/tests/parquet/content_defined_chunking.rs @@ -97,7 +97,7 @@ async fn cdc_data_round_trip() { let batch = make_test_batch(5000); let mut opts = TableParquetOptions::default(); - opts.global.use_content_defined_chunking = Some(CdcOptions::default()); + opts.global.content_defined_chunking = CdcOptions::enabled(); let props = writer_props(&mut opts, &batch.schema()); let tmp = write_parquet_file(&batch, props); @@ -145,11 +145,12 @@ async fn cdc_affects_page_boundaries() { // Write WITH CDC using small chunk sizes to maximize effect let mut cdc_opts = TableParquetOptions::default(); - cdc_opts.global.use_content_defined_chunking = Some(CdcOptions { + cdc_opts.global.content_defined_chunking = CdcOptions { + enabled: true, min_chunk_size: 512, max_chunk_size: 2048, norm_level: 0, - }); + }; let cdc_file = write_parquet_file(&batch, writer_props(&mut cdc_opts, &batch.schema())); let cdc_meta = read_metadata(&cdc_file); diff --git a/datafusion/datasource-parquet/src/sink.rs b/datafusion/datasource-parquet/src/sink.rs index a73be8d2e68cf..f15f67aab0a87 100644 --- a/datafusion/datasource-parquet/src/sink.rs +++ b/datafusion/datasource-parquet/src/sink.rs @@ -294,7 +294,7 @@ impl FileSink for ParquetSink { // CDC requires the sequential writer: the chunker state lives in ArrowWriter // and persists across row groups. The parallel path bypasses ArrowWriter entirely. if !parquet_opts.global.allow_single_file_parallelism - || parquet_opts.global.use_content_defined_chunking.is_some() + || parquet_opts.global.content_defined_chunking.enabled { let mut writer = self .create_async_arrow_writer( diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 684d9a2612408..eaaef84f2e821 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -639,9 +639,10 @@ message ParquetOptions { } message CdcOptions { - uint64 min_chunk_size = 1; - uint64 max_chunk_size = 2; - int32 norm_level = 3; + bool enabled = 1; + uint64 min_chunk_size = 2; + uint64 max_chunk_size = 3; + int32 norm_level = 4; } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 94a06bcc13bbd..b48a279de2297 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1130,17 +1130,12 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), - use_content_defined_chunking: value.content_defined_chunking.map(|cdc| { - let defaults = CdcOptions::default(); - CdcOptions { - // proto3 uses 0 as the wire default for uint64; a zero chunk size is - // invalid, so treat it as "field not set" and fall back to the default. - min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, - max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, - // norm_level = 0 is a valid value (and the default), so pass it through directly. - norm_level: cdc.norm_level, - } - }), + content_defined_chunking: value.content_defined_chunking.map(|cdc| CdcOptions { + enabled: cdc.enabled, + min_chunk_size: cdc.min_chunk_size as usize, + max_chunk_size: cdc.max_chunk_size as usize, + norm_level: cdc.norm_level, + }).unwrap_or_default(), }) } } @@ -1348,7 +1343,7 @@ mod tests { #[test] fn test_parquet_options_cdc_disabled_round_trip() { let opts = ParquetOptions::default(); - assert!(opts.use_content_defined_chunking.is_none()); + assert!(!opts.content_defined_chunking.enabled); let recovered = parquet_options_proto_round_trip(opts.clone()); assert_eq!(opts, recovered); } @@ -1389,15 +1384,17 @@ mod tests { #[test] fn test_parquet_options_cdc_enabled_round_trip() { let opts = ParquetOptions { - use_content_defined_chunking: Some(CdcOptions { + content_defined_chunking: CdcOptions { + enabled: true, min_chunk_size: 128 * 1024, max_chunk_size: 512 * 1024, norm_level: 2, - }), + }, ..ParquetOptions::default() }; let recovered = parquet_options_proto_round_trip(opts.clone()); - let cdc = recovered.use_content_defined_chunking.unwrap(); + let cdc = recovered.content_defined_chunking; + assert!(cdc.enabled); assert_eq!(cdc.min_chunk_size, 128 * 1024); assert_eq!(cdc.max_chunk_size, 512 * 1024); assert_eq!(cdc.norm_level, 2); @@ -1406,30 +1403,30 @@ mod tests { #[test] fn test_parquet_options_cdc_negative_norm_level_round_trip() { let opts = ParquetOptions { - use_content_defined_chunking: Some(CdcOptions { + content_defined_chunking: CdcOptions { + enabled: true, norm_level: -3, ..CdcOptions::default() - }), + }, ..ParquetOptions::default() }; let recovered = parquet_options_proto_round_trip(opts); - assert_eq!( - recovered.use_content_defined_chunking.unwrap().norm_level, - -3 - ); + assert_eq!(recovered.content_defined_chunking.norm_level, -3); } #[test] fn test_table_parquet_options_cdc_round_trip() { let mut opts = TableParquetOptions::default(); - opts.global.use_content_defined_chunking = Some(CdcOptions { + opts.global.content_defined_chunking = CdcOptions { + enabled: true, min_chunk_size: 64 * 1024, max_chunk_size: 2 * 1024 * 1024, norm_level: -1, - }); + }; let recovered = table_parquet_options_proto_round_trip(opts.clone()); - let cdc = recovered.global.use_content_defined_chunking.unwrap(); + let cdc = recovered.global.content_defined_chunking; + assert!(cdc.enabled); assert_eq!(cdc.min_chunk_size, 64 * 1024); assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); assert_eq!(cdc.norm_level, -1); @@ -1438,8 +1435,8 @@ mod tests { #[test] fn test_table_parquet_options_cdc_disabled_round_trip() { let opts = TableParquetOptions::default(); - assert!(opts.global.use_content_defined_chunking.is_none()); + assert!(!opts.global.content_defined_chunking.enabled); let recovered = table_parquet_options_proto_round_trip(opts.clone()); - assert!(recovered.global.use_content_defined_chunking.is_none()); + assert!(!recovered.global.content_defined_chunking.enabled); } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 0568982e97a44..e3bf115de9a53 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -919,6 +919,9 @@ impl serde::Serialize for CdcOptions { { use serde::ser::SerializeStruct; let mut len = 0; + if self.enabled { + len += 1; + } if self.min_chunk_size != 0 { len += 1; } @@ -929,6 +932,9 @@ impl serde::Serialize for CdcOptions { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion_common.CdcOptions", len)?; + if self.enabled { + struct_ser.serialize_field("enabled", &self.enabled)?; + } if self.min_chunk_size != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -952,6 +958,7 @@ impl<'de> serde::Deserialize<'de> for CdcOptions { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ + "enabled", "min_chunk_size", "minChunkSize", "max_chunk_size", @@ -962,6 +969,7 @@ impl<'de> serde::Deserialize<'de> for CdcOptions { #[allow(clippy::enum_variant_names)] enum GeneratedField { + Enabled, MinChunkSize, MaxChunkSize, NormLevel, @@ -986,6 +994,7 @@ impl<'de> serde::Deserialize<'de> for CdcOptions { E: serde::de::Error, { match value { + "enabled" => Ok(GeneratedField::Enabled), "minChunkSize" | "min_chunk_size" => Ok(GeneratedField::MinChunkSize), "maxChunkSize" | "max_chunk_size" => Ok(GeneratedField::MaxChunkSize), "normLevel" | "norm_level" => Ok(GeneratedField::NormLevel), @@ -1008,11 +1017,18 @@ impl<'de> serde::Deserialize<'de> for CdcOptions { where V: serde::de::MapAccess<'de>, { + let mut enabled__ = None; let mut min_chunk_size__ = None; let mut max_chunk_size__ = None; let mut norm_level__ = None; while let Some(k) = map_.next_key()? { match k { + GeneratedField::Enabled => { + if enabled__.is_some() { + return Err(serde::de::Error::duplicate_field("enabled")); + } + enabled__ = Some(map_.next_value()?); + } GeneratedField::MinChunkSize => { if min_chunk_size__.is_some() { return Err(serde::de::Error::duplicate_field("minChunkSize")); @@ -1040,6 +1056,7 @@ impl<'de> serde::Deserialize<'de> for CdcOptions { } } Ok(CdcOptions { + enabled: enabled__.unwrap_or_default(), min_chunk_size: min_chunk_size__.unwrap_or_default(), max_chunk_size: max_chunk_size__.unwrap_or_default(), norm_level: norm_level__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 632b16929faa6..d333dfe833efd 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -976,11 +976,13 @@ pub mod parquet_options { } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct CdcOptions { - #[prost(uint64, tag = "1")] - pub min_chunk_size: u64, + #[prost(bool, tag = "1")] + pub enabled: bool, #[prost(uint64, tag = "2")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "3")] pub max_chunk_size: u64, - #[prost(int32, tag = "3")] + #[prost(int32, tag = "4")] pub norm_level: i32, } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 940679b836ff1..5e5b4448f9705 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -938,13 +938,12 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), coerce_int96_tz_opt: value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), - content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc| - protobuf::CdcOptions { - min_chunk_size: cdc.min_chunk_size as u64, - max_chunk_size: cdc.max_chunk_size as u64, - norm_level: cdc.norm_level, - } - ), + content_defined_chunking: Some(protobuf::CdcOptions { + enabled: value.content_defined_chunking.enabled, + min_chunk_size: value.content_defined_chunking.min_chunk_size as u64, + max_chunk_size: value.content_defined_chunking.max_chunk_size as u64, + norm_level: value.content_defined_chunking.norm_level, + }), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 632b16929faa6..d333dfe833efd 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -976,11 +976,13 @@ pub mod parquet_options { } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct CdcOptions { - #[prost(uint64, tag = "1")] - pub min_chunk_size: u64, + #[prost(bool, tag = "1")] + pub enabled: bool, #[prost(uint64, tag = "2")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "3")] pub max_chunk_size: u64, - #[prost(int32, tag = "3")] + #[prost(int32, tag = "4")] pub norm_level: i32, } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 683b6a612a53f..fed240a09cdfb 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -453,12 +453,11 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), - content_defined_chunking: global_options.global.use_content_defined_chunking.as_ref().map(|cdc| { - CdcOptionsProto { - min_chunk_size: cdc.min_chunk_size as u64, - max_chunk_size: cdc.max_chunk_size as u64, - norm_level: cdc.norm_level, - } + content_defined_chunking: Some(CdcOptionsProto { + enabled: global_options.global.content_defined_chunking.enabled, + min_chunk_size: global_options.global.content_defined_chunking.min_chunk_size as u64, + max_chunk_size: global_options.global.content_defined_chunking.max_chunk_size as u64, + norm_level: global_options.global.content_defined_chunking.norm_level, }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { @@ -562,17 +561,12 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), - use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { - let defaults = CdcOptions::default(); - CdcOptions { - // proto3 uses 0 as the wire default for uint64; a zero chunk size is - // invalid, so treat it as "field not set" and fall back to the default. - min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, - max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, - // norm_level = 0 is a valid value (and the default), so pass it through directly. - norm_level: cdc.norm_level, - } - }), + content_defined_chunking: proto.content_defined_chunking.map(|cdc| CdcOptions { + enabled: cdc.enabled, + min_chunk_size: cdc.min_chunk_size as usize, + max_chunk_size: cdc.max_chunk_size as usize, + norm_level: cdc.norm_level, + }).unwrap_or_default(), } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 1aa9bc79e5bbe..cb31405daf0c2 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -239,6 +239,10 @@ datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.coerce_int96_tz NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) +datafusion.execution.parquet.content_defined_chunking.enabled false +datafusion.execution.parquet.content_defined_chunking.max_chunk_size 1048576 +datafusion.execution.parquet.content_defined_chunking.min_chunk_size 262144 +datafusion.execution.parquet.content_defined_chunking.norm_level 0 datafusion.execution.parquet.created_by datafusion datafusion.execution.parquet.data_page_row_count_limit 20000 datafusion.execution.parquet.data_pagesize_limit 1048576 @@ -260,7 +264,6 @@ datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.statistics_truncate_length 64 -datafusion.execution.parquet.use_content_defined_chunking NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.perfect_hash_join_min_key_density 0.15 @@ -389,6 +392,10 @@ datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader datafusion.execution.parquet.coerce_int96_tz NULL (reading) Optional timezone applied to INT96 columns when `coerce_int96` is set. When `Some`, INT96 columns coerce to `Timestamp(, Some())` instead of the default `Timestamp(, None)`. Spark and other systems write INT96 values as UTC-adjusted instants, so callers that need the resulting Arrow type to be timezone-aware (e.g. for Spark `TimestampType` semantics) should set this to `"UTC"`. No effect when `coerce_int96` is `None`. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. +datafusion.execution.parquet.content_defined_chunking.enabled false (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. +datafusion.execution.parquet.content_defined_chunking.max_chunk_size 1048576 Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB. +datafusion.execution.parquet.content_defined_chunking.min_chunk_size 262144 Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB. +datafusion.execution.parquet.content_defined_chunking.norm_level 0 Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0. datafusion.execution.parquet.created_by datafusion (writing) Sets "created by" property datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best effort maximum number of rows in data page datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes @@ -410,7 +417,6 @@ datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding t datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting -datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. diff --git a/datafusion/sqllogictest/test_files/parquet_cdc.slt b/datafusion/sqllogictest/test_files/parquet_cdc.slt index f87f05af74a0c..bc9b3aeaeae07 100644 --- a/datafusion/sqllogictest/test_files/parquet_cdc.slt +++ b/datafusion/sqllogictest/test_files/parquet_cdc.slt @@ -28,14 +28,15 @@ CREATE TABLE cdc_source AS VALUES (5, 'eve', 500.99) # -# Test 1: Enable CDC with 'true' (uses default options) +# Test 1: Enable CDC with the explicit `content_defined_chunking.enabled` key +# (uses default chunking parameters). # query I COPY cdc_source TO 'test_files/scratch/parquet_cdc/enabled_true/' STORED AS PARQUET OPTIONS ( - 'format.use_content_defined_chunking' 'true' + 'format.content_defined_chunking.enabled' 'true' ) ---- 5 @@ -68,15 +69,14 @@ SELECT SUM(column3) FROM cdc_enabled_true_read 1502.49 # -# Test 2: Disable CDC with 'false' (same as default behavior) +# Test 2: CDC is disabled by default (no content_defined_chunking options set). +# It can also be turned off explicitly with +# `content_defined_chunking.enabled` = 'false'. # query I COPY cdc_source TO 'test_files/scratch/parquet_cdc/disabled_false/' STORED AS PARQUET -OPTIONS ( - 'format.use_content_defined_chunking' 'false' -) ---- 5 @@ -95,16 +95,17 @@ SELECT * FROM cdc_disabled_false_read 5 eve 500.99 # -# Test 3: Enable CDC with custom sub-field options +# Test 3: Enable CDC with custom chunking parameters # query I COPY cdc_source TO 'test_files/scratch/parquet_cdc/custom_chunks/' STORED AS PARQUET OPTIONS ( - 'format.use_content_defined_chunking.min_chunk_size' '1024', - 'format.use_content_defined_chunking.max_chunk_size' '4096', - 'format.use_content_defined_chunking.norm_level' '1' + 'format.content_defined_chunking.enabled' 'true', + 'format.content_defined_chunking.min_chunk_size' '1024', + 'format.content_defined_chunking.max_chunk_size' '4096', + 'format.content_defined_chunking.norm_level' '1' ) ---- 5 @@ -135,7 +136,7 @@ CREATE EXTERNAL TABLE cdc_external_write ( ) STORED AS PARQUET LOCATION 'test_files/scratch/parquet_cdc/external_table/' OPTIONS ( - 'format.use_content_defined_chunking' 'true' + 'format.content_defined_chunking.enabled' 'true' ) query I @@ -169,7 +170,7 @@ query I COPY cdc_large_source TO 'test_files/scratch/parquet_cdc/large/' STORED AS PARQUET OPTIONS ( - 'format.use_content_defined_chunking' 'true' + 'format.content_defined_chunking.enabled' 'true' ) ---- 1000 @@ -213,7 +214,7 @@ query I COPY cdc_types_source TO 'test_files/scratch/parquet_cdc/types/' STORED AS PARQUET OPTIONS ( - 'format.use_content_defined_chunking' 'true' + 'format.content_defined_chunking.enabled' 'true' ) ---- 3 diff --git a/datafusion/sqllogictest/test_files/parquet_cdc_config.slt b/datafusion/sqllogictest/test_files/parquet_cdc_config.slt new file mode 100644 index 0000000000000..2e2b3a5d2ca0b --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet_cdc_config.slt @@ -0,0 +1,64 @@ +# Content-defined chunking (CDC) config resolution. +# +# CDC is a plain CdcOptions struct with an explicit `enabled` flag, so toggling +# `content_defined_chunking.enabled` is independent of the chunking parameters +# and of the order in which keys are set. There is no bare boolean form. + +statement ok +SET datafusion.catalog.information_schema = true + +# Disabled by default: enabled=false, parameters at their defaults. +query TT rowsort +SELECT name, value FROM information_schema.df_settings +WHERE name LIKE '%content_defined_chunking%' +---- +datafusion.execution.parquet.content_defined_chunking.enabled false +datafusion.execution.parquet.content_defined_chunking.max_chunk_size 1048576 +datafusion.execution.parquet.content_defined_chunking.min_chunk_size 262144 +datafusion.execution.parquet.content_defined_chunking.norm_level 0 + +# Setting a parameter does NOT enable CDC: `enabled` stays false. +statement ok +SET datafusion.execution.parquet.content_defined_chunking.min_chunk_size = 2048 + +query TT rowsort +SELECT name, value FROM information_schema.df_settings +WHERE name LIKE '%content_defined_chunking%' +---- +datafusion.execution.parquet.content_defined_chunking.enabled false +datafusion.execution.parquet.content_defined_chunking.max_chunk_size 1048576 +datafusion.execution.parquet.content_defined_chunking.min_chunk_size 2048 +datafusion.execution.parquet.content_defined_chunking.norm_level 0 + +# Enabling is explicit and independent of the parameters already set. +statement ok +SET datafusion.execution.parquet.content_defined_chunking.enabled = true + +query TT rowsort +SELECT name, value FROM information_schema.df_settings +WHERE name LIKE '%content_defined_chunking%' +---- +datafusion.execution.parquet.content_defined_chunking.enabled true +datafusion.execution.parquet.content_defined_chunking.max_chunk_size 1048576 +datafusion.execution.parquet.content_defined_chunking.min_chunk_size 2048 +datafusion.execution.parquet.content_defined_chunking.norm_level 0 + +# Disabling only flips the flag; the parameters are left untouched. +statement ok +SET datafusion.execution.parquet.content_defined_chunking.enabled = false + +query TT rowsort +SELECT name, value FROM information_schema.df_settings +WHERE name LIKE '%content_defined_chunking%' +---- +datafusion.execution.parquet.content_defined_chunking.enabled false +datafusion.execution.parquet.content_defined_chunking.max_chunk_size 1048576 +datafusion.execution.parquet.content_defined_chunking.min_chunk_size 2048 +datafusion.execution.parquet.content_defined_chunking.norm_level 0 + +# Restore defaults so the harness does not see modified configuration. +statement ok +SET datafusion.execution.parquet.content_defined_chunking.min_chunk_size = 262144 + +statement ok +SET datafusion.catalog.information_schema = false diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index e1626882c3d21..c1f3258ccc005 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -113,7 +113,10 @@ The following configuration settings are available: | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | -| datafusion.execution.parquet.use_content_defined_chunking | NULL | (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. | +| datafusion.execution.parquet.content_defined_chunking.enabled | false | (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. | +| datafusion.execution.parquet.content_defined_chunking.min_chunk_size | 262144 | Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB. | +| datafusion.execution.parquet.content_defined_chunking.max_chunk_size | 1048576 | Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB. | +| datafusion.execution.parquet.content_defined_chunking.norm_level | 0 | Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. |