From 2006bca0b43a2509f533c2c056ac77aab89b86d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 13 Apr 2026 19:43:32 +0200 Subject: [PATCH 1/8] feat: two-generation early emission for partial aggregation When the partial aggregate's hash table exceeds a configurable size threshold (default: 4MB), use a two-generation scheme to emit intermediate state while keeping the hash table cache-friendly. When the hot hash table fills up: 1. Emit the cold batch (previous generation's state) downstream 2. Promote the current hot table state to the cold batch 3. Reset the hot hash table and continue reading input This gives recurring groups a second chance to be merged locally before being sent downstream, reducing the number of partial emissions through the hash repartition while keeping the working set in CPU cache. At end-of-input, the remaining hot state and cold batch are emitted as separate batches (no concat_batches copy). Cold batch is also correctly flushed when transitioning to SkippingAggregation or Done. New config: datafusion.execution.partial_aggregation_max_table_size Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/common/src/config.rs | 8 ++ .../physical-plan/src/aggregates/row_hash.rs | 116 +++++++++++++++--- .../test_files/information_schema.slt | 2 + 3 files changed, 110 insertions(+), 16 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0eec3f948034a..0cc9c81a55093 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -639,6 +639,14 @@ config_namespace! { /// aggregation ratio check and trying to switch to skipping aggregation mode pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 + /// Maximum memory (in bytes) that the partial aggregation hash table + /// may use before emitting intermediate state and resetting. This + /// keeps the hash table small enough to fit in CPU cache, improving + /// performance for high-cardinality GROUP BY queries. A value of 0 + /// disables early emission. Only applies to Partial aggregation mode + /// with unordered input. + pub partial_aggregation_max_table_size: usize, default = 4_194_304 + /// Should DataFusion use row number estimates at the input to decide /// whether increasing parallelism is beneficial or not. By default, /// only exact row numbers (not estimates) are used for this decision. diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 056a7f171a516..16abfe75649ec 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -437,6 +437,18 @@ pub(crate) struct GroupedHashAggregateStream { /// current stream. skip_aggregation_probe: Option, + /// Maximum size (in bytes) of the hash table before emitting + /// intermediate state and resetting during partial aggregation. + /// 0 means disabled. + early_emit_max_table_size: usize, + + /// Two-generation early emission: the previous generation's partial + /// state batch. When the hot hash table fills up, we emit this cold + /// batch (if any), then store the hot table's state as the new cold + /// batch. Groups appearing across multiple generations get merged + /// locally before being sent downstream. + early_emit_cold_batch: Option, + // ======================================================================== // EXECUTION RESOURCES: // Fields related to managing execution resources and monitoring performance. @@ -649,6 +661,18 @@ impl GroupedHashAggregateStream { None }; + let early_emit_max_table_size = if agg.mode == AggregateMode::Partial + && matches!(group_ordering, GroupOrdering::None) + { + context + .session_config() + .options() + .execution + .partial_aggregation_max_table_size + } else { + 0 + }; + let reduction_factor = if agg.mode == AggregateMode::Partial { Some( MetricBuilder::new(&agg.metrics) @@ -680,6 +704,8 @@ impl GroupedHashAggregateStream { spill_state, group_values_soft_limit: agg.limit_options().map(|config| config.limit()), skip_aggregation_probe, + early_emit_max_table_size, + early_emit_cold_batch: None, reduction_factor, }) } @@ -780,6 +806,39 @@ impl Stream for GroupedHashAggregateStream { } } + // Two-generation early emission: keeps the hash table + // small enough to fit in CPU cache while giving + // recurring groups a second chance to be merged locally. + // + // When the hot table fills: + // 1. Emit the cold batch (previous generation) if any + // 2. Promote current hot table state → cold batch + // 3. Reset hot table and continue reading + if self.early_emit_max_table_size > 0 { + let table_size = self.group_values.size() + + self + .accumulators + .iter() + .map(|x| x.size()) + .sum::(); + if table_size >= self.early_emit_max_table_size { + // Take the cold batch to emit + let to_emit = self.early_emit_cold_batch.take(); + // Promote hot → cold + let batch_size = self.batch_size; + self.early_emit_cold_batch = + self.emit(EmitTo::All, false)?; + self.clear_shrink(batch_size); + // Emit the previous cold batch if we had one + if let Some(batch) = to_emit { + timer.done(); + self.exec_state = + ExecutionState::ProducingOutput(batch); + break 'reading_input; + } + } + } + // If we reach this point, try to update the memory reservation // handling out-of-memory conditions as determined by the OOM mode. if let Some(new_state) = @@ -834,7 +893,12 @@ impl Stream for GroupedHashAggregateStream { self.group_values.len() ))); } - self.exec_state = ExecutionState::Done; + // Emit any remaining cold batch from early emission + if let Some(batch) = self.early_emit_cold_batch.take() { + self.exec_state = ExecutionState::ProducingOutput(batch); + } else { + self.exec_state = ExecutionState::Done; + } } } } @@ -843,22 +907,31 @@ impl Stream for GroupedHashAggregateStream { // slice off a part of the batch, if needed let output_batch; let size = self.batch_size; + let batch = batch.clone(); (self.exec_state, output_batch) = if batch.num_rows() <= size { - ( - if self.input_done { + let next_state = if self.input_done { + // Emit remaining cold batch before finishing + if let Some(cold) = self.early_emit_cold_batch.take() { + ExecutionState::ProducingOutput(cold) + } else { ExecutionState::Done } - // In Partial aggregation, we also need to check - // if we should trigger partial skipping - else if self.mode == AggregateMode::Partial - && self.should_skip_aggregation() - { - ExecutionState::SkippingAggregation + } + // In Partial aggregation, we also need to check + // if we should trigger partial skipping + else if self.mode == AggregateMode::Partial + && self.should_skip_aggregation() + { + // Flush cold batch before switching to skip mode + if let Some(cold) = self.early_emit_cold_batch.take() { + ExecutionState::ProducingOutput(cold) } else { - ExecutionState::ReadingInput - }, - batch.clone(), - ) + ExecutionState::SkippingAggregation + } + } else { + ExecutionState::ReadingInput + }; + (next_state, batch) } else { // output first batch_size rows let size = self.batch_size; @@ -1221,14 +1294,25 @@ impl GroupedHashAggregateStream { self.group_ordering.input_done(); let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); + self.exec_state = if self.spill_state.spills.is_empty() { // Input has been entirely processed without spilling to disk. - // Flush any remaining group values. + // Flush any remaining group values from the hot table. + // The cold batch (if any) remains in early_emit_cold_batch + // and will be emitted when ProducingOutput transitions to Done. let batch = self.emit(EmitTo::All, false)?; - // If there are none, we're done; otherwise switch to emitting them - batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput) + // If there are none but we have a cold batch, emit that instead + match (batch, self.early_emit_cold_batch.take()) { + (Some(hot), Some(cold)) => { + // Emit hot first; store cold for next poll + self.early_emit_cold_batch = Some(cold); + ExecutionState::ProducingOutput(hot) + } + (Some(b), None) | (None, Some(b)) => ExecutionState::ProducingOutput(b), + (None, None) => ExecutionState::Done, + } } else { // Spill any remaining data to disk. There is some performance overhead in // writing out this last chunk of data and reading it back. The benefit of diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index ab8a4a293234e..9a71903cd61e7 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -262,6 +262,7 @@ datafusion.execution.parquet.statistics_truncate_length 64 datafusion.execution.parquet.use_content_defined_chunking NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 +datafusion.execution.partial_aggregation_max_table_size 4194304 datafusion.execution.perfect_hash_join_min_key_density 0.15 datafusion.execution.perfect_hash_join_small_build_threshold 1024 datafusion.execution.planning_concurrency 13 @@ -407,6 +408,7 @@ datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statis datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" +datafusion.execution.partial_aggregation_max_table_size 4194304 Maximum memory (in bytes) that the partial aggregation hash table may use before emitting intermediate state and resetting. This keeps the hash table small enough to fit in CPU cache, improving performance for high-cardinality GROUP BY queries. A value of 0 disables early emission. Only applies to Partial aggregation mode with unordered input. datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system From 966b507e02aa5e61f9976eae4200747e44b97221 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 13 Apr 2026 20:21:48 +0200 Subject: [PATCH 2/8] refactor: improve early emission with ratio gating, memory tracking, and dedup Gate early emission on the skip-aggregation probe ratio (>= 0.5) so queries with good aggregation reduction are not penalized. Track cold batch memory in the reservation to avoid silent OOM. Extract table_size() and next_state_draining_cold() helpers to reduce duplication. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-plan/src/aggregates/row_hash.rs | 115 ++++++++++-------- 1 file changed, 65 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 16abfe75649ec..dddc55d9eb883 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -204,6 +204,16 @@ impl SkipAggregationProbe { self.should_skip } + /// Returns the current ratio of groups to input rows, or `None` if + /// not enough rows have been seen yet. + fn ratio(&self) -> Option { + if self.input_rows >= self.probe_rows_threshold { + Some(self.num_groups as f64 / self.input_rows as f64) + } else { + None + } + } + /// Record the number of rows that were output directly without aggregation fn record_skipped(&mut self, batch: &RecordBatch) { self.skipped_aggregation_rows.add(batch.num_rows()); @@ -445,8 +455,8 @@ pub(crate) struct GroupedHashAggregateStream { /// Two-generation early emission: the previous generation's partial /// state batch. When the hot hash table fills up, we emit this cold /// batch (if any), then store the hot table's state as the new cold - /// batch. Groups appearing across multiple generations get merged - /// locally before being sent downstream. + /// batch. The downstream final aggregator handles re-merging any + /// duplicate groups that appear across generations. early_emit_cold_batch: Option, // ======================================================================== @@ -806,15 +816,24 @@ impl Stream for GroupedHashAggregateStream { } } - // Two-generation early emission: keeps the hash table - // small enough to fit in CPU cache while giving - // recurring groups a second chance to be merged locally. + // Two-generation early emission: keeps the hash + // table cache-friendly by emitting when it exceeds + // the size threshold. // - // When the hot table fills: - // 1. Emit the cold batch (previous generation) if any - // 2. Promote current hot table state → cold batch - // 3. Reset hot table and continue reading - if self.early_emit_max_table_size > 0 { + // Only activate AFTER the skip aggregation probe + // has evaluated and decided NOT to skip. This + // prevents early emission from interfering with + // very-high-cardinality queries (ratio >= 0.8) + // where the skip probe should take over entirely. + let early_emit_enabled = + self.early_emit_max_table_size > 0 + && match &self.skip_aggregation_probe { + None => true, + Some(p) => { + p.ratio().is_some() && !p.should_skip() + } + }; + if early_emit_enabled { let table_size = self.group_values.size() + self .accumulators @@ -822,14 +841,11 @@ impl Stream for GroupedHashAggregateStream { .map(|x| x.size()) .sum::(); if table_size >= self.early_emit_max_table_size { - // Take the cold batch to emit let to_emit = self.early_emit_cold_batch.take(); - // Promote hot → cold let batch_size = self.batch_size; self.early_emit_cold_batch = self.emit(EmitTo::All, false)?; self.clear_shrink(batch_size); - // Emit the previous cold batch if we had one if let Some(batch) = to_emit { timer.done(); self.exec_state = @@ -893,12 +909,8 @@ impl Stream for GroupedHashAggregateStream { self.group_values.len() ))); } - // Emit any remaining cold batch from early emission - if let Some(batch) = self.early_emit_cold_batch.take() { - self.exec_state = ExecutionState::ProducingOutput(batch); - } else { - self.exec_state = ExecutionState::Done; - } + self.exec_state = + self.next_state_draining_cold(ExecutionState::Done); } } } @@ -910,24 +922,13 @@ impl Stream for GroupedHashAggregateStream { let batch = batch.clone(); (self.exec_state, output_batch) = if batch.num_rows() <= size { let next_state = if self.input_done { - // Emit remaining cold batch before finishing - if let Some(cold) = self.early_emit_cold_batch.take() { - ExecutionState::ProducingOutput(cold) - } else { - ExecutionState::Done - } - } - // In Partial aggregation, we also need to check - // if we should trigger partial skipping - else if self.mode == AggregateMode::Partial + self.next_state_draining_cold(ExecutionState::Done) + } else if self.mode == AggregateMode::Partial && self.should_skip_aggregation() { - // Flush cold batch before switching to skip mode - if let Some(cold) = self.early_emit_cold_batch.take() { - ExecutionState::ProducingOutput(cold) - } else { - ExecutionState::SkippingAggregation - } + self.next_state_draining_cold( + ExecutionState::SkippingAggregation, + ) } else { ExecutionState::ReadingInput }; @@ -1126,9 +1127,8 @@ impl GroupedHashAggregateStream { } fn update_memory_reservation(&mut self) -> Result<()> { - let acc = self.accumulators.iter().map(|x| x.size()).sum::(); - let groups_and_acc_size = acc - + self.group_values.size() + let table_size = self.table_size(); + let groups_and_acc_size = table_size + self.group_ordering.size() + self.current_group_indices.allocated_size(); @@ -1142,12 +1142,17 @@ impl GroupedHashAggregateStream { // after clear_shrink is sufficient to cover the sort memory. let sort_headroom = if self.oom_mode == OutOfMemoryMode::Spill && !self.group_values.is_empty() { - acc + self.group_values.size() + table_size } else { 0 }; - let new_size = groups_and_acc_size + sort_headroom; + let cold_batch_size = self + .early_emit_cold_batch + .as_ref() + .map_or(0, get_record_batch_memory_size); + + let new_size = groups_and_acc_size + sort_headroom + cold_batch_size; let reservation_result = self.reservation.try_resize(new_size); if reservation_result.is_ok() { @@ -1276,6 +1281,21 @@ impl GroupedHashAggregateStream { self.clear_shrink(0); } + /// Returns the combined size of the hash table and accumulators in bytes. + fn table_size(&self) -> usize { + self.group_values.size() + + self.accumulators.iter().map(|x| x.size()).sum::() + } + + /// Transition to `fallback` state, but first drain the cold batch if present. + fn next_state_draining_cold(&mut self, fallback: ExecutionState) -> ExecutionState { + if let Some(cold) = self.early_emit_cold_batch.take() { + ExecutionState::ProducingOutput(cold) + } else { + fallback + } + } + /// returns true if there is a soft groups limit and the number of distinct /// groups we have seen is over that limit fn hit_soft_group_limit(&self) -> bool { @@ -1300,18 +1320,13 @@ impl GroupedHashAggregateStream { // Flush any remaining group values from the hot table. // The cold batch (if any) remains in early_emit_cold_batch - // and will be emitted when ProducingOutput transitions to Done. + // and will be drained via next_state_draining_cold when + // ProducingOutput transitions to Done. let batch = self.emit(EmitTo::All, false)?; - // If there are none but we have a cold batch, emit that instead - match (batch, self.early_emit_cold_batch.take()) { - (Some(hot), Some(cold)) => { - // Emit hot first; store cold for next poll - self.early_emit_cold_batch = Some(cold); - ExecutionState::ProducingOutput(hot) - } - (Some(b), None) | (None, Some(b)) => ExecutionState::ProducingOutput(b), - (None, None) => ExecutionState::Done, + match batch { + Some(b) => ExecutionState::ProducingOutput(b), + None => self.next_state_draining_cold(ExecutionState::Done), } } else { // Spill any remaining data to disk. There is some performance overhead in From 9619b00c04d25bf10ef24bd090359784ec828b00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 13 Apr 2026 20:22:59 +0200 Subject: [PATCH 3/8] fix: only emit early after skip probe evaluates and decides not to skip The early emission was firing before the skip aggregation probe could evaluate (needs 100K rows), causing regressions for very high-cardinality GROUP BY queries (e.g. Q32 GROUP BY WatchID, ClientIP was 1.53x slower). Fix: only enable early emission AFTER the skip probe has evaluated and decided NOT to skip. This ensures: - Before 100K rows: no early emission (let probe evaluate first) - High cardinality (ratio >= 0.8): skip probe takes over, no emission - Medium cardinality (ratio < 0.8): early emission keeps hash table cache-friendly Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/aggregates/row_hash.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index dddc55d9eb883..8007de677aa2d 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -825,14 +825,11 @@ impl Stream for GroupedHashAggregateStream { // prevents early emission from interfering with // very-high-cardinality queries (ratio >= 0.8) // where the skip probe should take over entirely. - let early_emit_enabled = - self.early_emit_max_table_size > 0 - && match &self.skip_aggregation_probe { - None => true, - Some(p) => { - p.ratio().is_some() && !p.should_skip() - } - }; + let early_emit_enabled = self.early_emit_max_table_size > 0 + && match &self.skip_aggregation_probe { + None => true, + Some(p) => p.ratio().is_some() && !p.should_skip(), + }; if early_emit_enabled { let table_size = self.group_values.size() + self From 01835ce8773a93097f91e5ace1e53db1b1c30fce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 13 Apr 2026 20:41:42 +0200 Subject: [PATCH 4/8] refactor: replace two-generation scheme with bounded hash table overflow Replace the two-generation cold-batch emission scheme with a simpler bounded hash table approach: - When the hash table exceeds the size limit, switch to overflow passthrough mode: subsequent batches are converted directly to intermediate state (like SkippingAggregation) while the hash table retains its accumulated groups - At end-of-input, emit the hash table's accumulated state - No emit/reset cycle, no cold batch, no serialization overhead - The hash table acts as a cache for groups seen early in the stream Benefits over two-generation: - No RecordBatch serialization/deserialization of accumulated state - No hash table rebuild after reset - Simpler state machine (no cold batch management) - Groups accumulated before overflow stay aggregated Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-plan/src/aggregates/row_hash.rs | 123 +++++++++--------- 1 file changed, 59 insertions(+), 64 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 8007de677aa2d..a229b6bf70383 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -447,17 +447,19 @@ pub(crate) struct GroupedHashAggregateStream { /// current stream. skip_aggregation_probe: Option, - /// Maximum size (in bytes) of the hash table before emitting - /// intermediate state and resetting during partial aggregation. + /// Maximum size (in bytes) of the hash table before switching to + /// overflow passthrough mode during partial aggregation. When + /// exceeded, new batches are converted directly to intermediate + /// state (like SkippingAggregation) while the hash table retains + /// its accumulated groups for emission at end-of-input. /// 0 means disabled. - early_emit_max_table_size: usize, + overflow_passthrough_max_table_size: usize, - /// Two-generation early emission: the previous generation's partial - /// state batch. When the hot hash table fills up, we emit this cold - /// batch (if any), then store the hot table's state as the new cold - /// batch. The downstream final aggregator handles re-merging any - /// duplicate groups that appear across generations. - early_emit_cold_batch: Option, + /// Whether we're in overflow passthrough mode. When true, input + /// batches are converted to intermediate state and passed through + /// without updating the hash table. The table's accumulated state + /// is emitted at end-of-input. + overflow_passthrough: bool, // ======================================================================== // EXECUTION RESOURCES: @@ -671,8 +673,11 @@ impl GroupedHashAggregateStream { None }; - let early_emit_max_table_size = if agg.mode == AggregateMode::Partial + // Overflow passthrough requires convert_to_state support + // (same requirement as skip_aggregation_probe) + let overflow_passthrough_max_table_size = if agg.mode == AggregateMode::Partial && matches!(group_ordering, GroupOrdering::None) + && skip_aggregation_probe.is_some() { context .session_config() @@ -714,8 +719,8 @@ impl GroupedHashAggregateStream { spill_state, group_values_soft_limit: agg.limit_options().map(|config| config.limit()), skip_aggregation_probe, - early_emit_max_table_size, - early_emit_cold_batch: None, + overflow_passthrough_max_table_size, + overflow_passthrough: false, reduction_factor, }) } @@ -766,6 +771,24 @@ impl Stream for GroupedHashAggregateStream { reduction_factor.add_total(input_rows); } + // Overflow passthrough: when the hash table has + // exceeded the size limit, convert input batches + // directly to intermediate state (like + // SkippingAggregation) while keeping the hash + // table's accumulated groups intact for + // end-of-input emission. + if self.overflow_passthrough { + let _timer = elapsed_compute.timer(); + if let Some(probe) = self.skip_aggregation_probe.as_mut() + { + probe.record_skipped(&batch); + } + let states = self.transform_to_states(&batch)?; + return Poll::Ready(Some(Ok( + states.record_output(&self.baseline_metrics) + ))); + } + // Do the grouping. // `group_aggregate_batch` will _not_ have updated the memory reservation yet. // The rest of the code will first try to reduce memory usage by @@ -816,39 +839,32 @@ impl Stream for GroupedHashAggregateStream { } } - // Two-generation early emission: keeps the hash - // table cache-friendly by emitting when it exceeds - // the size threshold. + // Check if the hash table has exceeded the size + // limit. If so, switch to overflow passthrough + // mode: subsequent batches are converted directly + // to intermediate state while the hash table + // retains its accumulated groups. // - // Only activate AFTER the skip aggregation probe - // has evaluated and decided NOT to skip. This - // prevents early emission from interfering with - // very-high-cardinality queries (ratio >= 0.8) - // where the skip probe should take over entirely. - let early_emit_enabled = self.early_emit_max_table_size > 0 + // Only activate after the skip probe has evaluated + // and decided NOT to skip. This prevents overflow + // from interfering with very-high-cardinality + // queries where skip_aggregation should take over. + if self.overflow_passthrough_max_table_size > 0 + && !self.overflow_passthrough && match &self.skip_aggregation_probe { None => true, Some(p) => p.ratio().is_some() && !p.should_skip(), - }; - if early_emit_enabled { + } + { let table_size = self.group_values.size() + self .accumulators .iter() .map(|x| x.size()) .sum::(); - if table_size >= self.early_emit_max_table_size { - let to_emit = self.early_emit_cold_batch.take(); - let batch_size = self.batch_size; - self.early_emit_cold_batch = - self.emit(EmitTo::All, false)?; - self.clear_shrink(batch_size); - if let Some(batch) = to_emit { - timer.done(); - self.exec_state = - ExecutionState::ProducingOutput(batch); - break 'reading_input; - } + if table_size >= self.overflow_passthrough_max_table_size + { + self.overflow_passthrough = true; } } @@ -906,8 +922,7 @@ impl Stream for GroupedHashAggregateStream { self.group_values.len() ))); } - self.exec_state = - self.next_state_draining_cold(ExecutionState::Done); + self.exec_state = ExecutionState::Done; } } } @@ -919,13 +934,11 @@ impl Stream for GroupedHashAggregateStream { let batch = batch.clone(); (self.exec_state, output_batch) = if batch.num_rows() <= size { let next_state = if self.input_done { - self.next_state_draining_cold(ExecutionState::Done) + ExecutionState::Done } else if self.mode == AggregateMode::Partial && self.should_skip_aggregation() { - self.next_state_draining_cold( - ExecutionState::SkippingAggregation, - ) + ExecutionState::SkippingAggregation } else { ExecutionState::ReadingInput }; @@ -1144,12 +1157,7 @@ impl GroupedHashAggregateStream { 0 }; - let cold_batch_size = self - .early_emit_cold_batch - .as_ref() - .map_or(0, get_record_batch_memory_size); - - let new_size = groups_and_acc_size + sort_headroom + cold_batch_size; + let new_size = groups_and_acc_size + sort_headroom; let reservation_result = self.reservation.try_resize(new_size); if reservation_result.is_ok() { @@ -1284,15 +1292,6 @@ impl GroupedHashAggregateStream { + self.accumulators.iter().map(|x| x.size()).sum::() } - /// Transition to `fallback` state, but first drain the cold batch if present. - fn next_state_draining_cold(&mut self, fallback: ExecutionState) -> ExecutionState { - if let Some(cold) = self.early_emit_cold_batch.take() { - ExecutionState::ProducingOutput(cold) - } else { - fallback - } - } - /// returns true if there is a soft groups limit and the number of distinct /// groups we have seen is over that limit fn hit_soft_group_limit(&self) -> bool { @@ -1315,16 +1314,12 @@ impl GroupedHashAggregateStream { self.exec_state = if self.spill_state.spills.is_empty() { // Input has been entirely processed without spilling to disk. - // Flush any remaining group values from the hot table. - // The cold batch (if any) remains in early_emit_cold_batch - // and will be drained via next_state_draining_cold when - // ProducingOutput transitions to Done. + // Flush any remaining group values from the hash table. + // In overflow passthrough mode, the table retains groups + // accumulated before the overflow threshold was reached. let batch = self.emit(EmitTo::All, false)?; - match batch { - Some(b) => ExecutionState::ProducingOutput(b), - None => self.next_state_draining_cold(ExecutionState::Done), - } + batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput) } else { // Spill any remaining data to disk. There is some performance overhead in // writing out this last chunk of data and reading it back. The benefit of From 1390b5ece2b27816c913beb8b77aacf335207181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 13 Apr 2026 21:08:27 +0200 Subject: [PATCH 5/8] fix: disable overflow passthrough for distinct aggregates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit COUNT(DISTINCT) and similar distinct aggregates produce per-row intermediate state when convert_to_state is called. In overflow mode this turns 100M rows into 100M single-value state objects that the downstream must merge — a 12x regression on Q9. Fix: skip overflow passthrough when any aggregate is distinct. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/aggregates/row_hash.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index a229b6bf70383..837d842ac90dc 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -674,10 +674,14 @@ impl GroupedHashAggregateStream { }; // Overflow passthrough requires convert_to_state support - // (same requirement as skip_aggregation_probe) + // (same requirement as skip_aggregation_probe) and no distinct + // aggregates (convert_to_state for distinct produces per-row + // state objects that are catastrophically expensive to merge). + let has_distinct = aggregate_exprs.iter().any(|e| e.is_distinct()); let overflow_passthrough_max_table_size = if agg.mode == AggregateMode::Partial && matches!(group_ordering, GroupOrdering::None) && skip_aggregation_probe.is_some() + && !has_distinct { context .session_config() From ecaea3d0ece6a9c14e6ed5b3cabb5fe31873dba4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 13 Apr 2026 21:15:31 +0200 Subject: [PATCH 6/8] fix: use emit+reset strategy for distinct aggregates Distinct aggregates (COUNT(DISTINCT)) benefit from periodic emit+reset (1.45x faster on Q9) but are catastrophically slow with overflow passthrough (12x slower) because convert_to_state produces per-row state objects. Now uses two strategies based on aggregate type: - Non-distinct: overflow passthrough (convert_to_state, no reset) - Distinct: emit+reset (state() serializes HashSets compactly) Both strategies keep the hash table within the configured size limit for better cache performance. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-plan/src/aggregates/row_hash.rs | 57 ++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 837d842ac90dc..c27b09c3d4ee8 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -461,6 +461,10 @@ pub(crate) struct GroupedHashAggregateStream { /// is emitted at end-of-input. overflow_passthrough: bool, + /// Maximum table size for emit+reset strategy (used for distinct + /// aggregates where overflow passthrough is not applicable). + emit_reset_max_table_size: usize, + // ======================================================================== // EXECUTION RESOURCES: // Fields related to managing execution resources and monitoring performance. @@ -673,15 +677,19 @@ impl GroupedHashAggregateStream { None }; - // Overflow passthrough requires convert_to_state support - // (same requirement as skip_aggregation_probe) and no distinct - // aggregates (convert_to_state for distinct produces per-row - // state objects that are catastrophically expensive to merge). + // Two strategies for keeping the hash table cache-friendly: + // + // 1. Overflow passthrough (non-distinct): convert overflow + // batches to state via convert_to_state. Fast, no + // serialization of accumulated groups. + // + // 2. Emit+reset (distinct): emit accumulated state via state() + // and reset the hash table. Works for distinct aggregates + // where convert_to_state produces catastrophically expensive + // per-row state. let has_distinct = aggregate_exprs.iter().any(|e| e.is_distinct()); - let overflow_passthrough_max_table_size = if agg.mode == AggregateMode::Partial + let max_table_size = if agg.mode == AggregateMode::Partial && matches!(group_ordering, GroupOrdering::None) - && skip_aggregation_probe.is_some() - && !has_distinct { context .session_config() @@ -691,6 +699,17 @@ impl GroupedHashAggregateStream { } else { 0 }; + let overflow_passthrough_max_table_size = + if max_table_size > 0 && skip_aggregation_probe.is_some() && !has_distinct { + max_table_size + } else { + 0 + }; + let emit_reset_max_table_size = if max_table_size > 0 && has_distinct { + max_table_size + } else { + 0 + }; let reduction_factor = if agg.mode == AggregateMode::Partial { Some( @@ -725,6 +744,7 @@ impl GroupedHashAggregateStream { skip_aggregation_probe, overflow_passthrough_max_table_size, overflow_passthrough: false, + emit_reset_max_table_size, reduction_factor, }) } @@ -872,6 +892,29 @@ impl Stream for GroupedHashAggregateStream { } } + // Emit+reset: for distinct aggregates, emit all + // accumulated state and reset the hash table. + // Uses state() which compactly serializes distinct + // HashSets, unlike convert_to_state which explodes. + if self.emit_reset_max_table_size > 0 { + let table_size = self.group_values.size() + + self + .accumulators + .iter() + .map(|x| x.size()) + .sum::(); + if table_size >= self.emit_reset_max_table_size { + let batch_size = self.batch_size; + if let Some(batch) = self.emit(EmitTo::All, false)? { + self.clear_shrink(batch_size); + timer.done(); + self.exec_state = + ExecutionState::ProducingOutput(batch); + break 'reading_input; + } + } + } + // If we reach this point, try to update the memory reservation // handling out-of-memory conditions as determined by the OOM mode. if let Some(new_state) = From 2ca217f76978a1144ce9047f92445ed9389c6f45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 14 Apr 2026 07:30:53 +0200 Subject: [PATCH 7/8] refactor: simplify early emission to single emit+reset strategy Replace the two-strategy approach (overflow passthrough for non-distinct, emit+reset for distinct) with a single emit+reset strategy for all partial aggregation cases. When the hash table exceeds the size limit, emit all accumulated state and reset the table. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-plan/src/aggregates/row_hash.rs | 143 +++--------------- 1 file changed, 23 insertions(+), 120 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index c27b09c3d4ee8..61c414d7b3f3b 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -204,16 +204,6 @@ impl SkipAggregationProbe { self.should_skip } - /// Returns the current ratio of groups to input rows, or `None` if - /// not enough rows have been seen yet. - fn ratio(&self) -> Option { - if self.input_rows >= self.probe_rows_threshold { - Some(self.num_groups as f64 / self.input_rows as f64) - } else { - None - } - } - /// Record the number of rows that were output directly without aggregation fn record_skipped(&mut self, batch: &RecordBatch) { self.skipped_aggregation_rows.add(batch.num_rows()); @@ -447,23 +437,12 @@ pub(crate) struct GroupedHashAggregateStream { /// current stream. skip_aggregation_probe: Option, - /// Maximum size (in bytes) of the hash table before switching to - /// overflow passthrough mode during partial aggregation. When - /// exceeded, new batches are converted directly to intermediate - /// state (like SkippingAggregation) while the hash table retains - /// its accumulated groups for emission at end-of-input. + /// Maximum size (in bytes) of the hash table before emitting + /// accumulated state and resetting during partial aggregation. + /// When exceeded, all groups are emitted as partial state and + /// the hash table is cleared, bounding memory usage. /// 0 means disabled. - overflow_passthrough_max_table_size: usize, - - /// Whether we're in overflow passthrough mode. When true, input - /// batches are converted to intermediate state and passed through - /// without updating the hash table. The table's accumulated state - /// is emitted at end-of-input. - overflow_passthrough: bool, - - /// Maximum table size for emit+reset strategy (used for distinct - /// aggregates where overflow passthrough is not applicable). - emit_reset_max_table_size: usize, + early_emit_max_table_size: usize, // ======================================================================== // EXECUTION RESOURCES: @@ -677,18 +656,11 @@ impl GroupedHashAggregateStream { None }; - // Two strategies for keeping the hash table cache-friendly: - // - // 1. Overflow passthrough (non-distinct): convert overflow - // batches to state via convert_to_state. Fast, no - // serialization of accumulated groups. - // - // 2. Emit+reset (distinct): emit accumulated state via state() - // and reset the hash table. Works for distinct aggregates - // where convert_to_state produces catastrophically expensive - // per-row state. - let has_distinct = aggregate_exprs.iter().any(|e| e.is_distinct()); - let max_table_size = if agg.mode == AggregateMode::Partial + // Emit+reset strategy: when the hash table exceeds a size + // limit, emit all accumulated state and reset the table. + // This bounds memory usage while still benefiting from + // partial aggregation. + let early_emit_max_table_size = if agg.mode == AggregateMode::Partial && matches!(group_ordering, GroupOrdering::None) { context @@ -699,17 +671,6 @@ impl GroupedHashAggregateStream { } else { 0 }; - let overflow_passthrough_max_table_size = - if max_table_size > 0 && skip_aggregation_probe.is_some() && !has_distinct { - max_table_size - } else { - 0 - }; - let emit_reset_max_table_size = if max_table_size > 0 && has_distinct { - max_table_size - } else { - 0 - }; let reduction_factor = if agg.mode == AggregateMode::Partial { Some( @@ -742,9 +703,7 @@ impl GroupedHashAggregateStream { spill_state, group_values_soft_limit: agg.limit_options().map(|config| config.limit()), skip_aggregation_probe, - overflow_passthrough_max_table_size, - overflow_passthrough: false, - emit_reset_max_table_size, + early_emit_max_table_size, reduction_factor, }) } @@ -795,24 +754,6 @@ impl Stream for GroupedHashAggregateStream { reduction_factor.add_total(input_rows); } - // Overflow passthrough: when the hash table has - // exceeded the size limit, convert input batches - // directly to intermediate state (like - // SkippingAggregation) while keeping the hash - // table's accumulated groups intact for - // end-of-input emission. - if self.overflow_passthrough { - let _timer = elapsed_compute.timer(); - if let Some(probe) = self.skip_aggregation_probe.as_mut() - { - probe.record_skipped(&batch); - } - let states = self.transform_to_states(&batch)?; - return Poll::Ready(Some(Ok( - states.record_output(&self.baseline_metrics) - ))); - } - // Do the grouping. // `group_aggregate_batch` will _not_ have updated the memory reservation yet. // The rest of the code will first try to reduce memory usage by @@ -863,55 +804,19 @@ impl Stream for GroupedHashAggregateStream { } } - // Check if the hash table has exceeded the size - // limit. If so, switch to overflow passthrough - // mode: subsequent batches are converted directly - // to intermediate state while the hash table - // retains its accumulated groups. - // - // Only activate after the skip probe has evaluated - // and decided NOT to skip. This prevents overflow - // from interfering with very-high-cardinality - // queries where skip_aggregation should take over. - if self.overflow_passthrough_max_table_size > 0 - && !self.overflow_passthrough - && match &self.skip_aggregation_probe { - None => true, - Some(p) => p.ratio().is_some() && !p.should_skip(), - } + // Emit+reset: when the hash table exceeds the + // size limit, emit all accumulated state and + // reset the table to bound memory usage. + if self.early_emit_max_table_size > 0 + && self.table_size() >= self.early_emit_max_table_size { - let table_size = self.group_values.size() - + self - .accumulators - .iter() - .map(|x| x.size()) - .sum::(); - if table_size >= self.overflow_passthrough_max_table_size - { - self.overflow_passthrough = true; - } - } - - // Emit+reset: for distinct aggregates, emit all - // accumulated state and reset the hash table. - // Uses state() which compactly serializes distinct - // HashSets, unlike convert_to_state which explodes. - if self.emit_reset_max_table_size > 0 { - let table_size = self.group_values.size() - + self - .accumulators - .iter() - .map(|x| x.size()) - .sum::(); - if table_size >= self.emit_reset_max_table_size { - let batch_size = self.batch_size; - if let Some(batch) = self.emit(EmitTo::All, false)? { - self.clear_shrink(batch_size); - timer.done(); - self.exec_state = - ExecutionState::ProducingOutput(batch); - break 'reading_input; - } + let batch_size = self.batch_size; + if let Some(batch) = self.emit(EmitTo::All, false)? { + self.clear_shrink(batch_size); + timer.done(); + self.exec_state = + ExecutionState::ProducingOutput(batch); + break 'reading_input; } } @@ -1362,8 +1267,6 @@ impl GroupedHashAggregateStream { // Input has been entirely processed without spilling to disk. // Flush any remaining group values from the hash table. - // In overflow passthrough mode, the table retains groups - // accumulated before the overflow threshold was reached. let batch = self.emit(EmitTo::All, false)?; batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput) From 0b08df010715efd91b6237e70649d0310b383f90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 14 Apr 2026 07:56:10 +0200 Subject: [PATCH 8/8] fix: gate emit+reset on skip aggregation probe evaluation Only activate emit+reset after the skip probe has evaluated and decided NOT to skip. This prevents constant emit+reset cycles on high-cardinality queries where skip_aggregation should take over instead. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-plan/src/aggregates/row_hash.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 61c414d7b3f3b..fa7f6f4e6d4da 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -204,6 +204,11 @@ impl SkipAggregationProbe { self.should_skip } + /// Returns whether the probe has evaluated (enough rows seen). + fn has_evaluated(&self) -> bool { + self.input_rows >= self.probe_rows_threshold + } + /// Record the number of rows that were output directly without aggregation fn record_skipped(&mut self, batch: &RecordBatch) { self.skipped_aggregation_rows.add(batch.num_rows()); @@ -807,7 +812,16 @@ impl Stream for GroupedHashAggregateStream { // Emit+reset: when the hash table exceeds the // size limit, emit all accumulated state and // reset the table to bound memory usage. + // + // Only activate after the skip probe has evaluated + // and decided NOT to skip. This prevents emit+reset + // from cycling on high-cardinality queries where + // skip_aggregation should take over instead. if self.early_emit_max_table_size > 0 + && match &self.skip_aggregation_probe { + None => true, + Some(p) => p.has_evaluated() && !p.should_skip(), + } && self.table_size() >= self.early_emit_max_table_size { let batch_size = self.batch_size;