From 551709c1b29dc79d5b442cc88f557773572f8e83 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 14 Apr 2026 15:54:48 -0400 Subject: [PATCH 1/3] Add a 10M row case to the sort benchmark. 100k is too small and frequently takes the <1MB sort in-place path of the existing ExternalSorter. We want to see larger cases too. --- datafusion/core/benches/sort.rs | 217 ++++++++++++++++++++------------ 1 file changed, 135 insertions(+), 82 deletions(-) diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 4ba57a1530e81..53f59a449f592 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -102,61 +102,103 @@ const NUM_STREAMS: usize = 8; /// The size of each batch within each stream const BATCH_SIZE: usize = 1024; -/// Total number of input rows to generate -const INPUT_SIZE: u64 = 100000; +/// Input sizes to benchmark. The small size (100K) exercises the +/// in-memory concat-and-sort path; the large size (10M) exercises +/// the sort-then-merge path with high fan-in. +const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (10_000_000, "10M")]; type PartitionedBatches = Vec>; fn criterion_benchmark(c: &mut Criterion) { - let cases: Vec<(&str, &dyn Fn(bool) -> PartitionedBatches)> = vec![ - ("i64", &i64_streams), - ("f64", &f64_streams), - ("utf8 low cardinality", &utf8_low_cardinality_streams), - ("utf8 high cardinality", &utf8_high_cardinality_streams), - ( - "utf8 view low cardinality", - &utf8_view_low_cardinality_streams, - ), - ( - "utf8 view high cardinality", - &utf8_view_high_cardinality_streams, - ), - ("utf8 tuple", &utf8_tuple_streams), - ("utf8 view tuple", &utf8_view_tuple_streams), - ("utf8 dictionary", &dictionary_streams), - ("utf8 dictionary tuple", &dictionary_tuple_streams), - ("mixed dictionary tuple", &mixed_dictionary_tuple_streams), - ("mixed tuple", &mixed_tuple_streams), - ( - "mixed tuple with utf8 view", - &mixed_tuple_with_utf8_view_streams, - ), - ]; - - for (name, f) in cases { - c.bench_function(&format!("merge sorted {name}"), |b| { - let data = f(true); - let case = BenchCase::merge_sorted(&data); - b.iter(move || case.run()) - }); - - c.bench_function(&format!("sort merge {name}"), |b| { - let data = f(false); - let case = BenchCase::sort_merge(&data); - b.iter(move || case.run()) - }); - - c.bench_function(&format!("sort {name}"), |b| { - let data = f(false); - let case = BenchCase::sort(&data); - b.iter(move || case.run()) - }); - - c.bench_function(&format!("sort partitioned {name}"), |b| { - let data = f(false); - let case = BenchCase::sort_partitioned(&data); - b.iter(move || case.run()) - }); + for &(input_size, size_label) in INPUT_SIZES { + let cases: Vec<(&str, Box PartitionedBatches>)> = vec![ + ( + "i64", + Box::new(move |sorted| i64_streams(sorted, input_size)), + ), + ( + "f64", + Box::new(move |sorted| f64_streams(sorted, input_size)), + ), + ( + "utf8 low cardinality", + Box::new(move |sorted| utf8_low_cardinality_streams(sorted, input_size)), + ), + ( + "utf8 high cardinality", + Box::new(move |sorted| utf8_high_cardinality_streams(sorted, input_size)), + ), + ( + "utf8 view low cardinality", + Box::new(move |sorted| { + utf8_view_low_cardinality_streams(sorted, input_size) + }), + ), + ( + "utf8 view high cardinality", + Box::new(move |sorted| { + utf8_view_high_cardinality_streams(sorted, input_size) + }), + ), + ( + "utf8 tuple", + Box::new(move |sorted| utf8_tuple_streams(sorted, input_size)), + ), + ( + "utf8 view tuple", + Box::new(move |sorted| utf8_view_tuple_streams(sorted, input_size)), + ), + ( + "utf8 dictionary", + Box::new(move |sorted| dictionary_streams(sorted, input_size)), + ), + ( + "utf8 dictionary tuple", + Box::new(move |sorted| dictionary_tuple_streams(sorted, input_size)), + ), + ( + "mixed dictionary tuple", + Box::new(move |sorted| { + mixed_dictionary_tuple_streams(sorted, input_size) + }), + ), + ( + "mixed tuple", + Box::new(move |sorted| mixed_tuple_streams(sorted, input_size)), + ), + ( + "mixed tuple with utf8 view", + Box::new(move |sorted| { + mixed_tuple_with_utf8_view_streams(sorted, input_size) + }), + ), + ]; + + for (name, f) in &cases { + c.bench_function(&format!("merge sorted {name} {size_label}"), |b| { + let data = f(true); + let case = BenchCase::merge_sorted(&data); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort merge {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort_merge(&data); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort(&data); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort partitioned {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort_partitioned(&data); + b.iter(move || case.run()) + }); + } } } @@ -279,8 +321,8 @@ fn make_sort_exprs(schema: &Schema) -> LexOrdering { } /// Create streams of int64 (where approximately 1/3 values is repeated) -fn i64_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().i64_values(); +fn i64_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).i64_values(); if sorted { values.sort_unstable(); } @@ -293,8 +335,8 @@ fn i64_streams(sorted: bool) -> PartitionedBatches { /// Create streams of f64 (where approximately 1/3 values are repeated) /// with the same distribution as i64_streams -fn f64_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().f64_values(); +fn f64_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).f64_values(); if sorted { values.sort_unstable_by(|a, b| a.total_cmp(b)); } @@ -306,8 +348,8 @@ fn f64_streams(sorted: bool) -> PartitionedBatches { } /// Create streams of random low cardinality utf8 values -fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().utf8_low_cardinality_values(); +fn utf8_low_cardinality_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).utf8_low_cardinality_values(); if sorted { values.sort_unstable(); } @@ -318,8 +360,11 @@ fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches { } /// Create streams of random low cardinality utf8_view values -fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().utf8_low_cardinality_values(); +fn utf8_view_low_cardinality_streams( + sorted: bool, + input_size: u64, +) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).utf8_low_cardinality_values(); if sorted { values.sort_unstable(); } @@ -330,8 +375,11 @@ fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches { } /// Create streams of high cardinality (~ no duplicates) utf8_view values -fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().utf8_high_cardinality_values(); +fn utf8_view_high_cardinality_streams( + sorted: bool, + input_size: u64, +) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).utf8_high_cardinality_values(); if sorted { values.sort_unstable(); } @@ -342,8 +390,8 @@ fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches { } /// Create streams of high cardinality (~ no duplicates) utf8 values -fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().utf8_high_cardinality_values(); +fn utf8_high_cardinality_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).utf8_high_cardinality_values(); if sorted { values.sort_unstable(); } @@ -354,8 +402,8 @@ fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_low, utf8_low, utf8_high) -fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn utf8_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); // need to sort by the combined key, so combine them together let mut tuples: Vec<_> = data_gen @@ -387,8 +435,8 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high) -fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn utf8_view_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); // need to sort by the combined key, so combine them together let mut tuples: Vec<_> = data_gen @@ -420,8 +468,8 @@ fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (f64, utf8_low, utf8_low, i64) -fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn mixed_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); // need to sort by the combined key, so combine them together let mut tuples: Vec<_> = data_gen @@ -458,8 +506,11 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (f64, utf8_view_low, utf8_view_low, i64) -fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn mixed_tuple_with_utf8_view_streams( + sorted: bool, + input_size: u64, +) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); // need to sort by the combined key, so combine them together let mut tuples: Vec<_> = data_gen @@ -496,8 +547,8 @@ fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_dict) -fn dictionary_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn dictionary_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); let mut values = data_gen.utf8_low_cardinality_values(); if sorted { values.sort_unstable(); @@ -511,8 +562,8 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_dict, utf8_dict, utf8_dict) -fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); let mut tuples: Vec<_> = data_gen .utf8_low_cardinality_values() .into_iter() @@ -542,8 +593,8 @@ fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64) -fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn mixed_dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); let mut tuples: Vec<_> = data_gen .utf8_low_cardinality_values() .into_iter() @@ -579,19 +630,21 @@ fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { /// Encapsulates creating data for this test struct DataGenerator { rng: StdRng, + input_size: u64, } impl DataGenerator { - fn new() -> Self { + fn new(input_size: u64) -> Self { Self { rng: StdRng::seed_from_u64(42), + input_size, } } /// Create an array of i64 sorted values (where approximately 1/3 values is repeated) fn i64_values(&mut self) -> Vec { - let mut vec: Vec<_> = (0..INPUT_SIZE) - .map(|_| self.rng.random_range(0..INPUT_SIZE as i64)) + let mut vec: Vec<_> = (0..self.input_size) + .map(|_| self.rng.random_range(0..self.input_size as i64)) .collect(); vec.sort_unstable(); @@ -614,7 +667,7 @@ impl DataGenerator { .collect::>(); // pick from the 100 strings randomly - let mut input = (0..INPUT_SIZE) + let mut input = (0..self.input_size) .map(|_| { let idx = self.rng.random_range(0..strings.len()); let s = Arc::clone(&strings[idx]); @@ -629,7 +682,7 @@ impl DataGenerator { /// Create sorted values of high cardinality (~ no duplicates) utf8 values fn utf8_high_cardinality_values(&mut self) -> Vec> { // make random strings - let mut input = (0..INPUT_SIZE) + let mut input = (0..self.input_size) .map(|_| Some(self.random_string())) .collect::>(); From 464b43a400e1d9ba98abdef579743ec0d081aca4 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 14 Apr 2026 16:29:25 -0400 Subject: [PATCH 2/3] Fix clippy. --- datafusion/core/benches/sort.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 53f59a449f592..ea6ab74446af4 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -108,10 +108,11 @@ const BATCH_SIZE: usize = 1024; const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (10_000_000, "10M")]; type PartitionedBatches = Vec>; +type StreamGenerator = Box PartitionedBatches>; fn criterion_benchmark(c: &mut Criterion) { for &(input_size, size_label) in INPUT_SIZES { - let cases: Vec<(&str, Box PartitionedBatches>)> = vec![ + let cases: Vec<(&str, StreamGenerator)> = vec![ ( "i64", Box::new(move |sorted| i64_streams(sorted, input_size)), From 3d25d7ca20da3b2743bcf2592c16c71ede79da2c Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 15 Apr 2026 09:59:24 -0400 Subject: [PATCH 3/3] Reduce from 10M to 1M to bring runtime down. --- datafusion/core/benches/sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index ea6ab74446af4..7544f7ae26d43 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -105,7 +105,7 @@ const BATCH_SIZE: usize = 1024; /// Input sizes to benchmark. The small size (100K) exercises the /// in-memory concat-and-sort path; the large size (10M) exercises /// the sort-then-merge path with high fan-in. -const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (10_000_000, "10M")]; +const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (1_000_000, "1M")]; type PartitionedBatches = Vec>; type StreamGenerator = Box PartitionedBatches>;