feat: early group emission with emit+reset for partial aggregation#21595
feat: early group emission with emit+reset for partial aggregation#21595Dandandan wants to merge 8 commits intoapache:mainfrom
Conversation
When the partial aggregate's hash table exceeds a configurable size threshold (default: 4MB), use a two-generation scheme to emit intermediate state while keeping the hash table cache-friendly. When the hot hash table fills up: 1. Emit the cold batch (previous generation's state) downstream 2. Promote the current hot table state to the cold batch 3. Reset the hot hash table and continue reading input This gives recurring groups a second chance to be merged locally before being sent downstream, reducing the number of partial emissions through the hash repartition while keeping the working set in CPU cache. At end-of-input, the remaining hot state and cold batch are emitted as separate batches (no concat_batches copy). Cold batch is also correctly flushed when transitioning to SkippingAggregation or Done. New config: datafusion.execution.partial_aggregation_max_table_size Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
cbc046e to
2006bca
Compare
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2006bca) to 0143dfe (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2006bca) to 0143dfe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2006bca) to 0143dfe (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
…and dedup Gate early emission on the skip-aggregation probe ratio (>= 0.5) so queries with good aggregation reduction are not penalized. Track cold batch memory in the reservation to avoid silent OOM. Extract table_size() and next_state_draining_cold() helpers to reduce duplication. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The early emission was firing before the skip aggregation probe could evaluate (needs 100K rows), causing regressions for very high-cardinality GROUP BY queries (e.g. Q32 GROUP BY WatchID, ClientIP was 1.53x slower). Fix: only enable early emission AFTER the skip probe has evaluated and decided NOT to skip. This ensures: - Before 100K rows: no early emission (let probe evaluate first) - High cardinality (ratio >= 0.8): skip probe takes over, no emission - Medium cardinality (ratio < 0.8): early emission keeps hash table cache-friendly Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (9619b00) to 0143dfe (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (9619b00) to 0143dfe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (9619b00) to 0143dfe (merge-base) diff using: tpch File an issue against this benchmark runner |
Replace the two-generation cold-batch emission scheme with a simpler bounded hash table approach: - When the hash table exceeds the size limit, switch to overflow passthrough mode: subsequent batches are converted directly to intermediate state (like SkippingAggregation) while the hash table retains its accumulated groups - At end-of-input, emit the hash table's accumulated state - No emit/reset cycle, no cold batch, no serialization overhead - The hash table acts as a cache for groups seen early in the stream Benefits over two-generation: - No RecordBatch serialization/deserialization of accumulated state - No hash table rebuild after reset - Simpler state machine (no cold batch management) - Groups accumulated before overflow stay aggregated Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (01835ce) to 0143dfe (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (01835ce) to 0143dfe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (01835ce) to 0143dfe (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
COUNT(DISTINCT) and similar distinct aggregates produce per-row intermediate state when convert_to_state is called. In overflow mode this turns 100M rows into 100M single-value state objects that the downstream must merge — a 12x regression on Q9. Fix: skip overflow passthrough when any aggregate is distinct. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (1390b5e) to 0143dfe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (ecaea3d) to 0143dfe (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (ecaea3d) to 0143dfe (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (ecaea3d) to 0143dfe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Replace the two-strategy approach (overflow passthrough for non-distinct, emit+reset for distinct) with a single emit+reset strategy for all partial aggregation cases. When the hash table exceeds the size limit, emit all accumulated state and reset the table. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2ca217f) to 0143dfe (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2ca217f) to 0143dfe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2ca217f) to 0143dfe (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2ca217f) to 0143dfe (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2ca217f) to 0143dfe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (2ca217f) to 0143dfe (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Only activate emit+reset after the skip probe has evaluated and decided NOT to skip. This prevents constant emit+reset cycles on high-cardinality queries where skip_aggregation should take over instead. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (0b08df0) to 0143dfe (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (0b08df0) to 0143dfe (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (0b08df0) to 0143dfe (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing early-group-emission (0b08df0) to 0143dfe (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Related to improving GROUP BY performance on large datasets (e.g., ClickBench).
Rationale for this change
The
AggregateExec(Partial)accumulates ALL input rows into a single hash table before emitting. For medium-to-high cardinality GROUP BY queries, the hash table grows far beyond CPU cache capacity, causing poor cache performance during hash table lookups.The existing
skip_partial_aggregationhandles the extreme case (ratio >= 0.8) by stopping aggregation entirely. But for medium cardinality (ratio 0.01-0.5), the hash table is large enough to cause cache misses yet small enough that aggregation is still beneficial.What changes are included in this PR?
Early group emission with emit+reset: When the partial aggregate's hash table exceeds a configurable size threshold (default: 4MB), it emits all accumulated groups as partial state and resets the table:
state()as partial aggregate stateKey properties:
New config:
datafusion.execution.partial_aggregation_max_table_size(default: 4MB, 0 to disable)Are these changes tested?
All existing aggregate SLT tests pass with the default threshold enabled.
Are there any user-facing changes?
New configuration option
datafusion.execution.partial_aggregation_max_table_size. The default (4MB) is enabled automatically. Set to 0 to disable.🤖 Generated with Claude Code