perf: Optimize ExternalSorter with chunked sort pipeline#21600
perf: Optimize ExternalSorter with chunked sort pipeline#21600mbutrovich wants to merge 25 commits intoapache:mainfrom
Conversation
…to choose between sort implementations.
| } else if self.sorted_runs_memory > reservation_size { | ||
| self.reservation | ||
| .grow(self.sorted_runs_memory - reservation_size); | ||
| } |
There was a problem hiding this comment.
The comment says it would only exceed the limit by a small amount, but wouldn't this compound across partitions if it's a high amount of them?
Maybe we could still use a try grow here and remedy the failure, or at least cap the grow amount?
There was a problem hiding this comment.
I updated the comments to explain the scenario a bit more, but let me know if you still think we should do something more strict.
| let use_radix_for_this_batch = | ||
| self.use_radix && batch.num_rows() > self.batch_size; | ||
|
|
There was a problem hiding this comment.
nit: since there is already a gate in sort_batch that handles radix vs lexsort maybe we could change the name of this variable for readability to something like use_chunked_radix?
There was a problem hiding this comment.
also just realizing, the "graceful degradation" section says the pipeline falls back to lexsort below batch_size rows, but wouldn't the else branch here still take the radix path? When use_radix_for_this_batch is false, this calls sort_batch, and sort_batch independently checks use_radix_sort when fetch.is_none()
for radix-eligible schemas it takes the radix path regardless of row count. Wouldn't that be slower?
There was a problem hiding this comment.
Thanks for the feedback @gratus00! I addressed both of these. Checking per-batch is wasteful too. I created an inner function that we can call directly because sort_batch is a public API and I don't want to change it.
- Always coalesce to `sort_coalesce_target_rows` regardless of schema (removed conditional that fell back to `batch_size` for non-radix) - Both radix and lexsort paths now go through `sort_batch_chunked` (both chunk output to `batch_size`) - Per-batch radix decision uses `sort_coalesce_target_rows` as threshold instead of `batch_size` - Added `radix_sorted_batches` and `lexsort_sorted_batches` counters to `ExternalSorterMetrics` - Added `sort_coalesce_target_rows` and `sort_use_radix` config fields to `ExternalSorter` - New `sort_use_radix` parameter gates the `use_radix_sort()` schema check ## `datafusion/common/src/config.rs` - New config: `sort_use_radix: bool, default = true` - Updated `sort_coalesce_target_rows` doc ## `datafusion/execution/src/config.rs` - New builder method: `with_sort_use_radix()` ## `datafusion/core/tests/fuzz_cases/sort_fuzz.rs` - `(20000, false)` → `(50000, true)` to fix flaky test ## `datafusion/sqllogictest/test_files/information_schema.slt` + `docs/source/user-guide/configs.md` - Added `sort_use_radix` entry, updated `sort_coalesce_target_rows` description
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing sort_redesign (73bb06b) to 0143dfe (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
So this is showing the improvement afforded by both the ExternalSorter rewrite (which helps lexsort by reducing fan-in) and radix sorting. I will push a commit that defaults radix sort off, run the benchmarks again to get a baseline understanding of the ExternalSorter changes. |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing sort_redesign (482e72c) to 0143dfe (merge-base) diff using: tpch10 File an issue against this benchmark runner |
Draft for benchmarking, builds on #21525.
Which issue does this PR close?
Partially addresses #21543.
Rationale for this change
ExternalSorter's merge path sorts each incoming batch individually (typically 8192 rows), then k-way merges all of them. This creates two problems:
lexsort_to_indicesat 32K+ rows for multi-column sorts, but at 8K rows theRowConverterencoding cost dominates. TPC-H SF10 benchmarks on perf: Bring over apache/arrow-rs/9683 radix sort, integrate into ExternalSorter #21525 confirmed this: naively swapping in radix sort made 12/22 queries slower (up to 1.20x).What changes are included in this PR?
Chunked sort pipeline
Replaces ExternalSorter's buffer-then-sort architecture with a coalesce-then-sort pipeline:
BatchCoalesceruntilsort_coalesce_target_rows(default 32768) is reachedbatch_sizeStreamingMergeBuilderUniform coalescing, per-batch algorithm selection
All schemas coalesce to
sort_coalesce_target_rows. This reduces merge fan-in for all queries, including single-column sorts like sort-merge join keys.Per batch, radix sort is used when the schema is eligible (multi-column, primitives/strings) and the batch reached
sort_coalesce_target_rows. Otherwise lexsort is used. Asort_use_radixconfig (defaulttrue) allows disabling radix entirely to isolate the pipeline's contribution.Metrics
New
radix_sorted_batchesandlexsort_sorted_batchescounters inExternalSorterMetrics, visible inEXPLAIN ANALYZE.Dead code removal
Sorted runs no longer require an in-memory merge before spilling. Removes
in_mem_sort_stream,sort_batch_stream,consume_and_spill_append,spill_finish,organize_stringview_arrays, andin_progress_spill_file.Config changes
sort_coalesce_target_rows(default 32768)sort_use_radix(default true)sort_in_place_threshold_bytes(no longer read,warnattribute per API health policy)Are these changes tested?
information_schema.sltupdated for new configsAre there any user-facing changes?
sort_coalesce_target_rows(default 32768) controls coalesce targetsort_use_radix(default true) enables/disables radix sortradix_sorted_batchesandlexsort_sorted_batchesinEXPLAIN ANALYZEsort_in_place_threshold_bytesis deprecated