diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 4ba57a1530e81..7544f7ae26d43 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -102,61 +102,104 @@ const NUM_STREAMS: usize = 8; /// The size of each batch within each stream const BATCH_SIZE: usize = 1024; -/// Total number of input rows to generate -const INPUT_SIZE: u64 = 100000; +/// Input sizes to benchmark. The small size (100K) exercises the +/// in-memory concat-and-sort path; the large size (10M) exercises +/// the sort-then-merge path with high fan-in. +const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (1_000_000, "1M")]; type PartitionedBatches = Vec>; +type StreamGenerator = Box PartitionedBatches>; fn criterion_benchmark(c: &mut Criterion) { - let cases: Vec<(&str, &dyn Fn(bool) -> PartitionedBatches)> = vec![ - ("i64", &i64_streams), - ("f64", &f64_streams), - ("utf8 low cardinality", &utf8_low_cardinality_streams), - ("utf8 high cardinality", &utf8_high_cardinality_streams), - ( - "utf8 view low cardinality", - &utf8_view_low_cardinality_streams, - ), - ( - "utf8 view high cardinality", - &utf8_view_high_cardinality_streams, - ), - ("utf8 tuple", &utf8_tuple_streams), - ("utf8 view tuple", &utf8_view_tuple_streams), - ("utf8 dictionary", &dictionary_streams), - ("utf8 dictionary tuple", &dictionary_tuple_streams), - ("mixed dictionary tuple", &mixed_dictionary_tuple_streams), - ("mixed tuple", &mixed_tuple_streams), - ( - "mixed tuple with utf8 view", - &mixed_tuple_with_utf8_view_streams, - ), - ]; - - for (name, f) in cases { - c.bench_function(&format!("merge sorted {name}"), |b| { - let data = f(true); - let case = BenchCase::merge_sorted(&data); - b.iter(move || case.run()) - }); - - c.bench_function(&format!("sort merge {name}"), |b| { - let data = f(false); - let case = BenchCase::sort_merge(&data); - b.iter(move || case.run()) - }); - - c.bench_function(&format!("sort {name}"), |b| { - let data = f(false); - let case = BenchCase::sort(&data); - b.iter(move || case.run()) - }); - - c.bench_function(&format!("sort partitioned {name}"), |b| { - let data = f(false); - let case = BenchCase::sort_partitioned(&data); - b.iter(move || case.run()) - }); + for &(input_size, size_label) in INPUT_SIZES { + let cases: Vec<(&str, StreamGenerator)> = vec![ + ( + "i64", + Box::new(move |sorted| i64_streams(sorted, input_size)), + ), + ( + "f64", + Box::new(move |sorted| f64_streams(sorted, input_size)), + ), + ( + "utf8 low cardinality", + Box::new(move |sorted| utf8_low_cardinality_streams(sorted, input_size)), + ), + ( + "utf8 high cardinality", + Box::new(move |sorted| utf8_high_cardinality_streams(sorted, input_size)), + ), + ( + "utf8 view low cardinality", + Box::new(move |sorted| { + utf8_view_low_cardinality_streams(sorted, input_size) + }), + ), + ( + "utf8 view high cardinality", + Box::new(move |sorted| { + utf8_view_high_cardinality_streams(sorted, input_size) + }), + ), + ( + "utf8 tuple", + Box::new(move |sorted| utf8_tuple_streams(sorted, input_size)), + ), + ( + "utf8 view tuple", + Box::new(move |sorted| utf8_view_tuple_streams(sorted, input_size)), + ), + ( + "utf8 dictionary", + Box::new(move |sorted| dictionary_streams(sorted, input_size)), + ), + ( + "utf8 dictionary tuple", + Box::new(move |sorted| dictionary_tuple_streams(sorted, input_size)), + ), + ( + "mixed dictionary tuple", + Box::new(move |sorted| { + mixed_dictionary_tuple_streams(sorted, input_size) + }), + ), + ( + "mixed tuple", + Box::new(move |sorted| mixed_tuple_streams(sorted, input_size)), + ), + ( + "mixed tuple with utf8 view", + Box::new(move |sorted| { + mixed_tuple_with_utf8_view_streams(sorted, input_size) + }), + ), + ]; + + for (name, f) in &cases { + c.bench_function(&format!("merge sorted {name} {size_label}"), |b| { + let data = f(true); + let case = BenchCase::merge_sorted(&data); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort merge {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort_merge(&data); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort(&data); + b.iter(move || case.run()) + }); + + c.bench_function(&format!("sort partitioned {name} {size_label}"), |b| { + let data = f(false); + let case = BenchCase::sort_partitioned(&data); + b.iter(move || case.run()) + }); + } } } @@ -279,8 +322,8 @@ fn make_sort_exprs(schema: &Schema) -> LexOrdering { } /// Create streams of int64 (where approximately 1/3 values is repeated) -fn i64_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().i64_values(); +fn i64_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).i64_values(); if sorted { values.sort_unstable(); } @@ -293,8 +336,8 @@ fn i64_streams(sorted: bool) -> PartitionedBatches { /// Create streams of f64 (where approximately 1/3 values are repeated) /// with the same distribution as i64_streams -fn f64_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().f64_values(); +fn f64_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).f64_values(); if sorted { values.sort_unstable_by(|a, b| a.total_cmp(b)); } @@ -306,8 +349,8 @@ fn f64_streams(sorted: bool) -> PartitionedBatches { } /// Create streams of random low cardinality utf8 values -fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().utf8_low_cardinality_values(); +fn utf8_low_cardinality_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).utf8_low_cardinality_values(); if sorted { values.sort_unstable(); } @@ -318,8 +361,11 @@ fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches { } /// Create streams of random low cardinality utf8_view values -fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().utf8_low_cardinality_values(); +fn utf8_view_low_cardinality_streams( + sorted: bool, + input_size: u64, +) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).utf8_low_cardinality_values(); if sorted { values.sort_unstable(); } @@ -330,8 +376,11 @@ fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches { } /// Create streams of high cardinality (~ no duplicates) utf8_view values -fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().utf8_high_cardinality_values(); +fn utf8_view_high_cardinality_streams( + sorted: bool, + input_size: u64, +) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).utf8_high_cardinality_values(); if sorted { values.sort_unstable(); } @@ -342,8 +391,8 @@ fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches { } /// Create streams of high cardinality (~ no duplicates) utf8 values -fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches { - let mut values = DataGenerator::new().utf8_high_cardinality_values(); +fn utf8_high_cardinality_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut values = DataGenerator::new(input_size).utf8_high_cardinality_values(); if sorted { values.sort_unstable(); } @@ -354,8 +403,8 @@ fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_low, utf8_low, utf8_high) -fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn utf8_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); // need to sort by the combined key, so combine them together let mut tuples: Vec<_> = data_gen @@ -387,8 +436,8 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high) -fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn utf8_view_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); // need to sort by the combined key, so combine them together let mut tuples: Vec<_> = data_gen @@ -420,8 +469,8 @@ fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (f64, utf8_low, utf8_low, i64) -fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn mixed_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); // need to sort by the combined key, so combine them together let mut tuples: Vec<_> = data_gen @@ -458,8 +507,11 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (f64, utf8_view_low, utf8_view_low, i64) -fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn mixed_tuple_with_utf8_view_streams( + sorted: bool, + input_size: u64, +) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); // need to sort by the combined key, so combine them together let mut tuples: Vec<_> = data_gen @@ -496,8 +548,8 @@ fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_dict) -fn dictionary_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn dictionary_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); let mut values = data_gen.utf8_low_cardinality_values(); if sorted { values.sort_unstable(); @@ -511,8 +563,8 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_dict, utf8_dict, utf8_dict) -fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); let mut tuples: Vec<_> = data_gen .utf8_low_cardinality_values() .into_iter() @@ -542,8 +594,8 @@ fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { } /// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64) -fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut data_gen = DataGenerator::new(); +fn mixed_dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches { + let mut data_gen = DataGenerator::new(input_size); let mut tuples: Vec<_> = data_gen .utf8_low_cardinality_values() .into_iter() @@ -579,19 +631,21 @@ fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { /// Encapsulates creating data for this test struct DataGenerator { rng: StdRng, + input_size: u64, } impl DataGenerator { - fn new() -> Self { + fn new(input_size: u64) -> Self { Self { rng: StdRng::seed_from_u64(42), + input_size, } } /// Create an array of i64 sorted values (where approximately 1/3 values is repeated) fn i64_values(&mut self) -> Vec { - let mut vec: Vec<_> = (0..INPUT_SIZE) - .map(|_| self.rng.random_range(0..INPUT_SIZE as i64)) + let mut vec: Vec<_> = (0..self.input_size) + .map(|_| self.rng.random_range(0..self.input_size as i64)) .collect(); vec.sort_unstable(); @@ -614,7 +668,7 @@ impl DataGenerator { .collect::>(); // pick from the 100 strings randomly - let mut input = (0..INPUT_SIZE) + let mut input = (0..self.input_size) .map(|_| { let idx = self.rng.random_range(0..strings.len()); let s = Arc::clone(&strings[idx]); @@ -629,7 +683,7 @@ impl DataGenerator { /// Create sorted values of high cardinality (~ no duplicates) utf8 values fn utf8_high_cardinality_values(&mut self) -> Vec> { // make random strings - let mut input = (0..INPUT_SIZE) + let mut input = (0..self.input_size) .map(|_| Some(self.random_string())) .collect::>();