Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 59 additions & 159 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,134 +709,50 @@ config_namespace! {
}
}

/// Options for content-defined chunking (CDC) when writing parquet files.
/// See [`ParquetOptions::use_content_defined_chunking`].
///
/// Can be enabled with default options by setting
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
/// like `use_content_defined_chunking.min_chunk_size`.
#[derive(Debug, Clone, PartialEq)]
pub struct CdcOptions {
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
/// until this many bytes have been accumulated. Default is 256 KiB.
pub min_chunk_size: usize,
config_namespace! {
/// Options for content-defined chunking (CDC) when writing parquet files.
/// Mirrors `parquet::file::properties::CdcOptions`.
///
/// Carried as a [`CdcOptions`] in [`ParquetOptions::content_defined_chunking`]
/// with an explicit `enabled` flag, so it can be toggled with dotted config
/// keys (`content_defined_chunking.enabled = true|false`) and the result is
/// independent of the order in which the keys are set.
pub struct CdcOptions {
/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
/// parquet files. When enabled, parallel writing is automatically disabled
/// since the chunker state must persist across row groups.
pub enabled: bool, default = false

/// Maximum chunk size in bytes. A split is forced when the accumulated
/// size exceeds this value. Default is 1 MiB.
pub max_chunk_size: usize,
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
/// until this many bytes have been accumulated. Default is 256 KiB.
pub min_chunk_size: usize, default = 256 * 1024

/// Normalization level. Increasing this improves deduplication ratio
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
pub norm_level: i32,
}
/// Maximum chunk size in bytes. A split is forced when the accumulated
/// size exceeds this value. Default is 1 MiB.
pub max_chunk_size: usize, default = 1024 * 1024

// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
// below to handle "true"/"false" for enabling/disabling CDC.
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
impl CdcOptions {
/// Returns a new `CdcOptions` with default values.
#[expect(clippy::should_implement_trait)]
pub fn default() -> Self {
Self {
min_chunk_size: 256 * 1024,
max_chunk_size: 1024 * 1024,
norm_level: 0,
}
/// Normalization level. Increasing this improves deduplication ratio
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
pub norm_level: i32, default = 0
}
}

impl ConfigField for CdcOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"min_chunk_size" => self.min_chunk_size.set(rem, value),
"max_chunk_size" => self.max_chunk_size.set(rem, value),
"norm_level" => self.norm_level.set(rem, value),
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
}
}

fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
let key = format!("{key_prefix}.min_chunk_size");
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
let key = format!("{key_prefix}.max_chunk_size");
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
let key = format!("{key_prefix}.norm_level");
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
}

fn reset(&mut self, key: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"min_chunk_size" => {
if rem.is_empty() {
self.min_chunk_size = CdcOptions::default().min_chunk_size;
Ok(())
} else {
self.min_chunk_size.reset(rem)
}
}
"max_chunk_size" => {
if rem.is_empty() {
self.max_chunk_size = CdcOptions::default().max_chunk_size;
Ok(())
} else {
self.max_chunk_size.reset(rem)
}
}
"norm_level" => {
if rem.is_empty() {
self.norm_level = CdcOptions::default().norm_level;
Ok(())
} else {
self.norm_level.reset(rem)
}
}
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
}
}
}

/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
/// setting individual sub-fields like `min_chunk_size`.
impl ConfigField for Option<CdcOptions> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(s) => s.visit(v, key, description),
None => v.none(key, description),
}
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
if key.is_empty() {
match value.to_ascii_lowercase().as_str() {
"true" => {
*self = Some(CdcOptions::default());
Ok(())
}
"false" => {
*self = None;
Ok(())
}
_ => _config_err!(
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
),
}
} else {
self.get_or_insert_with(CdcOptions::default).set(key, value)
impl CdcOptions {
/// Returns enabled CDC options with the default chunking parameters.
///
/// Shorthand for `CdcOptions { enabled: true, ..Default::default() }`; combine
/// with struct-update syntax to override parameters, e.g.
/// `CdcOptions { min_chunk_size: 4096, ..CdcOptions::enabled() }`.
pub fn enabled() -> Self {
Self {
enabled: true,
..Default::default()
}
}

fn reset(&mut self, key: &str) -> Result<()> {
if key.is_empty() {
*self = None;
Ok(())
} else {
self.get_or_insert_with(CdcOptions::default).reset(key)
}
/// Returns disabled CDC options (equivalent to [`CdcOptions::default`]).
pub fn disabled() -> Self {
Self::default()
}
}

Expand Down Expand Up @@ -1036,11 +952,14 @@ config_namespace! {
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
/// automatically disabled since the chunker state must persist across row groups.
pub use_content_defined_chunking: Option<CdcOptions>, default = None
/// (writing) EXPERIMENTAL: Content-defined chunking (CDC) options when writing
/// parquet files. Disabled by default; toggle with
/// `content_defined_chunking.enabled = true|false`. The chunking parameters live
/// under the same prefix (e.g. `content_defined_chunking.min_chunk_size`). When
/// enabled, parallel writing is automatically disabled since the chunker state
/// must persist across row groups. Mirrors
/// `parquet::file::properties::WriterProperties::content_defined_chunking`.
pub content_defined_chunking: CdcOptions, default = Default::default()
}
}

Expand Down Expand Up @@ -4020,73 +3939,54 @@ mod tests {

#[cfg(feature = "parquet")]
#[test]
fn set_cdc_option_with_boolean_true() {
fn set_cdc_enabled_flag() {
use crate::config::ConfigOptions;

let mut config = ConfigOptions::default();
assert!(
config
.execution
.parquet
.use_content_defined_chunking
.is_none()
);
// CDC is disabled by default.
assert!(!config.execution.parquet.content_defined_chunking.enabled);

// Setting to "true" should enable CDC with default options
// `.enabled = true` enables CDC; parameters keep their defaults.
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking",
"datafusion.execution.parquet.content_defined_chunking.enabled",
"true",
)
.unwrap();
let cdc = config
.execution
.parquet
.use_content_defined_chunking
.as_ref()
.expect("CDC should be enabled");
let cdc = &config.execution.parquet.content_defined_chunking;
assert!(cdc.enabled);
assert_eq!(cdc.min_chunk_size, 256 * 1024);
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
assert_eq!(cdc.norm_level, 0);

// Setting to "false" should disable CDC
// `.enabled = false` disables CDC.
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking",
"datafusion.execution.parquet.content_defined_chunking.enabled",
"false",
)
.unwrap();
assert!(
config
.execution
.parquet
.use_content_defined_chunking
.is_none()
);
assert!(!config.execution.parquet.content_defined_chunking.enabled);
}

#[cfg(feature = "parquet")]
#[test]
fn set_cdc_option_with_subfields() {
fn set_cdc_param_does_not_enable() {
use crate::config::ConfigOptions;

let mut config = ConfigOptions::default();

// Setting sub-fields should also enable CDC
// Setting a parameter does NOT enable CDC (`enabled` is a distinct field,
// defaulting to false), and the result is independent of key order.
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
"datafusion.execution.parquet.content_defined_chunking.min_chunk_size",
"1024",
)
.unwrap();
let cdc = config
.execution
.parquet
.use_content_defined_chunking
.as_ref()
.expect("CDC should be enabled");
let cdc = &config.execution.parquet.content_defined_chunking;
assert!(!cdc.enabled);
assert_eq!(cdc.min_chunk_size, 1024);
// Other fields should be defaults
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
assert_eq!(cdc.norm_level, 0);
}
Expand Down
Loading
Loading