From 5a3975b3d0424801f30045a318a8226933f28d1c Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:38:47 +0000 Subject: [PATCH 1/5] feat: reorder row groups by statistics during sort pushdown When sort pushdown is active, reorder row groups within each file by their min/max statistics to match the requested sort order. This helps TopK queries find optimal values first via dynamic filter pushdown. - Add reorder_by_statistics to PreparedAccessPlan that sorts row_group_indexes by the first sort column's min values - Pass sort order from ParquetSource::try_pushdown_sort through to the opener via sort_order_for_reorder field - Reorder happens after pruning but before reverse (they compose) - Gracefully skips reorder when statistics unavailable, sort expr is not a simple column, row_selection present, or <=1 row groups Closes #21317 --- .../datasource-parquet/src/access_plan.rs | 326 ++++++++++++++++++ datafusion/datasource-parquet/src/opener.rs | 21 ++ datafusion/datasource-parquet/src/source.rs | 10 +- 3 files changed, 355 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index ca4d097c37a44..d98f64044db37 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -16,7 +16,12 @@ // under the License. use crate::sort::reverse_row_selection; +use arrow::datatypes::Schema; use datafusion_common::{Result, assert_eq_or_internal_err}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use log::debug; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; @@ -377,6 +382,106 @@ impl PreparedAccessPlan { }) } + /// Reorder row groups by their min statistics for the given sort order. + /// + /// This helps TopK queries find optimal values first. For ASC sort, + /// row groups with the smallest min values come first. For DESC sort, + /// row groups with the largest min values come first. + /// + /// Gracefully skips reordering when: + /// - There is a row_selection (too complex to remap) + /// - 0 or 1 row groups (nothing to reorder) + /// - Sort expression is not a simple column reference + /// - Statistics are unavailable + pub(crate) fn reorder_by_statistics( + mut self, + sort_order: &LexOrdering, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result { + // Skip if row_selection present (too complex to remap) + if self.row_selection.is_some() { + debug!("Skipping RG reorder: row_selection present"); + return Ok(self); + } + + // Nothing to reorder + if self.row_group_indexes.len() <= 1 { + return Ok(self); + } + + // Get the first sort expression + // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr + let first_sort_expr = sort_order.first(); + + // Extract column name from sort expression + let column: &Column = match first_sort_expr.expr.as_any().downcast_ref::() + { + Some(col) => col, + None => { + debug!("Skipping RG reorder: sort expr is not a simple column"); + return Ok(self); + } + }; + + let descending = first_sort_expr.options.descending; + + // Build statistics converter for this column + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + file_metadata.file_metadata().schema_descr(), + ) { + Ok(c) => c, + Err(e) => { + debug!("Skipping RG reorder: cannot create stats converter: {e}"); + return Ok(self); + } + }; + + // Get min values for the selected row groups + let rg_metadata: Vec<&RowGroupMetaData> = self + .row_group_indexes + .iter() + .map(|&idx| file_metadata.row_group(idx)) + .collect(); + + let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get min values: {e}"); + return Ok(self); + } + }; + + // Sort indices by min values + let sort_options = arrow::compute::SortOptions { + descending, + nulls_first: first_sort_expr.options.nulls_first, + }; + let sorted_indices = match arrow::compute::sort_to_indices( + &min_values, + Some(sort_options), + None, + ) { + Ok(indices) => indices, + Err(e) => { + debug!("Skipping RG reorder: sort failed: {e}"); + return Ok(self); + } + }; + + // Apply the reordering + let original_indexes = self.row_group_indexes.clone(); + self.row_group_indexes = sorted_indices + .values() + .iter() + .map(|&i| original_indexes[i as usize]) + .collect(); + + Ok(self) + } + /// Reverse the access plan for reverse scanning pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result { // Get the row group indexes before reversing @@ -614,4 +719,225 @@ mod test { .unwrap(); Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + + // ---- reorder_by_statistics tests ---- + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use parquet::basic::Type as PhysicalType; + use parquet::file::metadata::FileMetaData; + use parquet::file::statistics::Statistics; + use parquet::schema::types::Type as SchemaType; + + /// Create ParquetMetaData with row groups that have Int32 min/max stats + fn make_metadata_with_stats(min_max_pairs: &[(i32, i32)]) -> ParquetMetaData { + let field = SchemaType::primitive_type_builder("id", PhysicalType::INT32) + .build() + .unwrap(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(field)]) + .build() + .unwrap(); + let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(schema))); + + let row_groups: Vec = min_max_pairs + .iter() + .map(|(min, max)| { + let stats = + Statistics::int32(Some(*min), Some(*max), None, Some(100), false); + let column = ColumnChunkMetaData::builder(schema_descr.column(0)) + .set_num_values(100) + .set_statistics(stats) + .build() + .unwrap(); + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(100) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect(); + + let file_meta = FileMetaData::new( + 1, + min_max_pairs.len() as i64 * 100, + None, + None, + schema_descr, + None, + ); + ParquetMetaData::new(file_meta, row_groups) + } + + fn make_sort_order_asc() -> LexOrdering { + LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "id", 0, + )))]) + .unwrap() + } + + fn make_sort_order_desc() -> LexOrdering { + use arrow::compute::SortOptions; + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("id", 0)), + SortOptions { + descending: true, + nulls_first: false, + }, + )]) + .unwrap() + } + + fn make_arrow_schema() -> Schema { + Schema::new(vec![Field::new("id", DataType::Int32, false)]) + } + + #[test] + fn test_reorder_by_statistics_asc() { + // RGs in wrong order: [50-99, 200-299, 1-30] + let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should be reordered: RG2(1-30), RG0(50-99), RG1(200-299) + assert_eq!(plan.row_group_indexes, vec![2, 0, 1]); + } + + #[test] + fn test_reorder_by_statistics_desc() { + // RGs: [50-99, 200-299, 1-30] + let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_desc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // DESC: largest min first: RG1(200-299), RG0(50-99), RG2(1-30) + assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); + } + + #[test] + fn test_reorder_by_statistics_single_rg() { + let metadata = make_metadata_with_stats(&[(1, 100)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Single RG, no reorder + assert_eq!(plan.row_group_indexes, vec![0]); + } + + #[test] + fn test_reorder_by_statistics_with_skipped_rgs() { + // 4 RGs but only 0, 2, 3 are selected (RG1 was pruned) + let metadata = + make_metadata_with_stats(&[(300, 400), (100, 200), (1, 50), (50, 99)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 2, 3], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Reorder selected RGs by min: RG2(1-50), RG3(50-99), RG0(300-400) + assert_eq!(plan.row_group_indexes, vec![2, 3, 0]); + } + + #[test] + fn test_reorder_by_statistics_skips_with_row_selection() { + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let selection = RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(100), + ]); + + let plan = PreparedAccessPlan::new(vec![0, 1], Some(selection)).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because row_selection is present + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } + + #[test] + fn test_reorder_by_statistics_already_sorted() { + // Already in correct ASC order + let metadata = make_metadata_with_stats(&[(1, 30), (50, 99), (200, 299)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Already sorted, order preserved + assert_eq!(plan.row_group_indexes, vec![0, 1, 2]); + } + + #[test] + fn test_reorder_by_statistics_skips_non_column_expr() { + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::BinaryExpr; + + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + let schema = make_arrow_schema(); + + // Sort expression is a binary expression (id + 1), not a simple column + let expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("id", 0)), + Operator::Plus, + Arc::new(datafusion_physical_expr::expressions::Literal::new( + datafusion_common::ScalarValue::Int32(Some(1)), + )), + )); + let sort_order = + LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap(); + + let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because sort expr is not a simple column + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } + + #[test] + fn test_reorder_by_statistics_skips_missing_column() { + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + // Schema has "id" but sort order references "nonexistent" + let schema = make_arrow_schema(); + let sort_order = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new( + Column::new("nonexistent", 99), + ))]) + .unwrap(); + + let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because column not found in schema + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 5307cb15eeace..0cbae848a974b 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -54,6 +54,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, PruningMetrics, @@ -154,6 +155,10 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Optional sort order used to reorder row groups by their min/max statistics. + /// When set, row groups are reordered before reading so that row groups likely + /// to contain optimal values (for TopK queries) are read first. + pub sort_order_for_reorder: Option, } impl fmt::Debug for ParquetMorselizer { @@ -304,6 +309,7 @@ struct PreparedParquetOpen { predicate_creation_errors: Count, max_predicate_cache_size: Option, reverse_row_groups: bool, + sort_order_for_reorder: Option, preserve_order: bool, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, @@ -752,6 +758,7 @@ impl ParquetMorselizer { predicate_creation_errors, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder.clone(), preserve_order: self.preserve_order, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, @@ -1223,6 +1230,16 @@ impl RowGroupsPrunedParquetOpen { // Prepare the access plan (extract row groups and row selection) let mut prepared_plan = access_plan.prepare(rg_metadata)?; + // Reorder row groups by statistics if sort order is known. + // This helps TopK queries find optimal values first. + if let Some(sort_order) = &prepared.sort_order_for_reorder { + prepared_plan = prepared_plan.reorder_by_statistics( + sort_order, + file_metadata.as_ref(), + &prepared.physical_file_schema, + )?; + } + // Potentially reverse the access plan for performance. // See `ParquetSource::try_pushdown_sort` for the rationale. if prepared.reverse_row_groups { @@ -1731,6 +1748,7 @@ mod test { use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, }; + use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{Stream, StreamExt}; use object_store::{ObjectStore, ObjectStoreExt, memory::InMemory, path::Path}; @@ -1759,6 +1777,7 @@ mod test { coerce_int96: Option, max_predicate_cache_size: Option, reverse_row_groups: bool, + sort_order_for_reorder: Option, preserve_order: bool, } @@ -1785,6 +1804,7 @@ mod test { coerce_int96: None, max_predicate_cache_size: None, reverse_row_groups: false, + sort_order_for_reorder: None, preserve_order: false, } } @@ -1900,6 +1920,7 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder, }; ParquetOpener { morselizer } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 7fad6862fa721..9ed20e8252b30 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -55,7 +55,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use itertools::Itertools; use object_store::ObjectStore; #[cfg(feature = "parquet_encryption")] @@ -294,6 +294,8 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Optional sort order used to reorder row groups by min/max statistics. + sort_order_for_reorder: Option, } impl ParquetSource { @@ -319,6 +321,7 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + sort_order_for_reorder: None, } } @@ -570,6 +573,7 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder.clone(), }, }); Ok(opener) @@ -817,7 +821,9 @@ impl FileSource for ParquetSource { // Return Inexact because we're only reversing row group order, // not guaranteeing perfect row-level ordering - let new_source = self.clone().with_reverse_row_groups(true); + let sort_order = LexOrdering::new(order.iter().cloned()); + let mut new_source = self.clone().with_reverse_row_groups(true); + new_source.sort_order_for_reorder = sort_order; Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_source) as Arc, }) From adb25b31c3132036ca97a1b4124f5a0b022a8e3d Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:48:06 +0000 Subject: [PATCH 2/5] test: add SLT tests for row group reorder by statistics --- .../sqllogictest/test_files/sort_pushdown.slt | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index b6c75f3977010..4f41c4b08aecf 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2271,6 +2271,89 @@ DROP TABLE tg_src_high; statement ok DROP TABLE tg_buffer; +# =========================================================== +# Test H: Row group reorder by statistics for TopK queries. +# When a file has multiple row groups with overlapping or +# out-of-order statistics, sort pushdown returns Inexact and +# `reorder_by_statistics` reorders row groups within the file +# so TopK finds optimal values first. +# =========================================================== + +# Create a table with 30 rows and write to parquet with small row groups +# so we get multiple row groups per file. Rows are inserted in a mixed +# order so row groups span overlapping ranges (forcing Inexact path). +statement ok +CREATE TABLE th_mixed(id INT, value INT) AS VALUES + (15, 150), (5, 50), (25, 250), + (10, 100), (20, 200), (1, 10), + (30, 300), (3, 30), (18, 180); + +# Write with row_group_size=3 → 3 rows per RG, 3 RGs total +# RG statistics (unsorted order): RG0(5-25), RG1(1-20), RG2(3-30) +# Note: files are overlapping → Inexact path → TopK retained +query I +COPY (SELECT * FROM th_mixed) +TO 'test_files/scratch/sort_pushdown/th_reorder/data.parquet' +STORED AS PARQUET +OPTIONS ('format.max_row_group_size' '3'); +---- +9 + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +CREATE EXTERNAL TABLE th_reorder(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/th_reorder/data.parquet'; + +# Test H.1: ASC ORDER BY with LIMIT — reorder helps TopK find min values first +# Results must be correct regardless of RG reorder. +query II +SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; +---- +1 10 +3 30 +5 50 + +# Test H.2: DESC ORDER BY with LIMIT — reorder + reverse compose +query II +SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; +---- +30 300 +25 250 +20 200 + +# Test H.3: Full sort (no LIMIT) — output must still be correctly sorted +query II +SELECT * FROM th_reorder ORDER BY id ASC; +---- +1 10 +3 30 +5 50 +10 100 +15 150 +18 180 +20 200 +25 250 +30 300 + +# Test H.4: ORDER BY expression (not a simple column) — reorder should +# gracefully skip, results still correct +query II +SELECT id, value FROM th_reorder ORDER BY id + 1 ASC LIMIT 3; +---- +1 10 +3 30 +5 50 + +# Cleanup Test H +statement ok +DROP TABLE th_mixed; + +statement ok +DROP TABLE th_reorder; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4; From 41aaeebb35c631bb8f3ddc33c761b5d6b11b5695 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:56:39 +0000 Subject: [PATCH 3/5] test: add EXPLAIN assertions for row group reorder tests --- .../sqllogictest/test_files/sort_pushdown.slt | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 4f41c4b08aecf..a9d512228fc14 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2307,7 +2307,19 @@ CREATE EXTERNAL TABLE th_reorder(id INT, value INT) STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/th_reorder/data.parquet'; -# Test H.1: ASC ORDER BY with LIMIT — reorder helps TopK find min values first +# Test H.1: ASC ORDER BY with LIMIT — Inexact path (file has no declared ordering) +# Plan: SortExec(TopK) preserved. RG reorder happens inside DataSourceExec +# (not visible in EXPLAIN, but verified by unit tests in access_plan.rs). +query TT +EXPLAIN SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: th_reorder.id ASC NULLS LAST, fetch=3 +02)--TableScan: th_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/th_reorder/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + # Results must be correct regardless of RG reorder. query II SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; @@ -2317,6 +2329,16 @@ SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; 5 50 # Test H.2: DESC ORDER BY with LIMIT — reorder + reverse compose +query TT +EXPLAIN SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: th_reorder.id DESC NULLS FIRST, fetch=3 +02)--TableScan: th_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/th_reorder/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + query II SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; ---- From b072cfe6c740c0c2b11f7445de6ced6a7a715dfa Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 07:35:19 +0000 Subject: [PATCH 4/5] fix: use max statistics for DESC sort reorder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For overlapping row group ranges, sorting by min for DESC can pick a worse first RG. Example: RG0(50-60) vs RG1(40-100) — min DESC picks RG0 first (max=60), but RG1 contains the largest values (max=100). Use min for ASC and max for DESC to correctly prioritize the row group most likely to contain the optimal values for TopK. --- .../datasource-parquet/src/access_plan.rs | 77 ++++++++++++++----- 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index d98f64044db37..3f1a0a675e361 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -439,37 +439,51 @@ impl PreparedAccessPlan { } }; - // Get min values for the selected row groups + // Get the relevant statistics for the selected row groups. + // For ASC sort: use min values — we want the RG with the smallest min + // to come first (best candidate for "smallest values"). + // For DESC sort: use max values — we want the RG with the largest max + // to come first (best candidate for "largest values"). Using min for + // DESC can pick a worse first RG when ranges overlap (e.g., RG0 50-60 + // vs RG1 40-100 — RG1 has larger values but smaller min). let rg_metadata: Vec<&RowGroupMetaData> = self .row_group_indexes .iter() .map(|&idx| file_metadata.row_group(idx)) .collect(); - let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { - Ok(vals) => vals, - Err(e) => { - debug!("Skipping RG reorder: cannot get min values: {e}"); - return Ok(self); + let stat_values = if descending { + match converter.row_group_maxes(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get max values: {e}"); + return Ok(self); + } + } + } else { + match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get min values: {e}"); + return Ok(self); + } } }; - // Sort indices by min values + // Sort indices by statistic values (min for ASC, max for DESC) let sort_options = arrow::compute::SortOptions { descending, nulls_first: first_sort_expr.options.nulls_first, }; - let sorted_indices = match arrow::compute::sort_to_indices( - &min_values, - Some(sort_options), - None, - ) { - Ok(indices) => indices, - Err(e) => { - debug!("Skipping RG reorder: sort failed: {e}"); - return Ok(self); - } - }; + let sorted_indices = + match arrow::compute::sort_to_indices(&stat_values, Some(sort_options), None) + { + Ok(indices) => indices, + Err(e) => { + debug!("Skipping RG reorder: sort failed: {e}"); + return Ok(self); + } + }; // Apply the reordering let original_indexes = self.row_group_indexes.clone(); @@ -821,7 +835,7 @@ mod test { .reorder_by_statistics(&sort_order, &metadata, &schema) .unwrap(); - // DESC: largest min first: RG1(200-299), RG0(50-99), RG2(1-30) + // DESC: largest max first: RG1(max=299), RG0(max=99), RG2(max=30) assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); } @@ -922,6 +936,31 @@ mod test { assert_eq!(plan.row_group_indexes, vec![0, 1]); } + #[test] + fn test_reorder_by_statistics_desc_uses_max_for_overlapping_rgs() { + // Overlapping ranges where min DESC would pick worse RG than max DESC: + // RG0: 50-60 (small range, moderate max) + // RG1: 40-100 (wide range, high max but lower min) + // RG2: 20-30 (low max) + // + // For ORDER BY DESC LIMIT N: + // Using min DESC: [RG0(50), RG1(40), RG2(20)] → reads RG0 first (max=60 only) + // Using max DESC: [RG1(100), RG0(60), RG2(30)] → reads RG1 first (max=100) + // + // RG1 is the better first choice for DESC because it contains the largest values. + let metadata = make_metadata_with_stats(&[(50, 60), (40, 100), (20, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_desc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Expected: RG1 (max=100) first, then RG0 (max=60), then RG2 (max=30) + assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); + } + #[test] fn test_reorder_by_statistics_skips_missing_column() { let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); From 802840a3bdc4156cfa2762957a1926c4d86a5182 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 14 Apr 2026 06:23:36 +0000 Subject: [PATCH 5/5] fix: prevent reorder+reverse double-reordering of row groups When sort_order_for_reorder is set, reorder_by_statistics already handles the sort direction (min for ASC, max for DESC). Applying reverse on top would undo the reorder. Use else-if so only one strategy is applied. Also adds sort_pushdown_inexact benchmark with pushdown_filters enabled to measure RG reorder benefit on wide-row TopK queries. --- benchmarks/bench.sh | 77 +++++++++++++++++++ .../queries/sort_pushdown_inexact/q1.sql | 8 ++ .../queries/sort_pushdown_inexact/q2.sql | 7 ++ .../queries/sort_pushdown_inexact/q3.sql | 8 ++ .../queries/sort_pushdown_inexact/q4.sql | 7 ++ benchmarks/src/sort_pushdown.rs | 9 ++- datafusion/datasource-parquet/src/opener.rs | 13 ++-- 7 files changed, 122 insertions(+), 7 deletions(-) create mode 100644 benchmarks/queries/sort_pushdown_inexact/q1.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q2.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q3.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q4.sql diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index badf9ce4352ad..688bd1c81ad32 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -316,6 +316,9 @@ main() { sort_pushdown|sort_pushdown_sorted) data_sort_pushdown ;; + sort_pushdown_inexact) + data_sort_pushdown_inexact + ;; sort_tpch) # same data as for tpch data_tpch "1" "parquet" @@ -522,6 +525,9 @@ main() { sort_pushdown_sorted) run_sort_pushdown_sorted ;; + sort_pushdown_inexact) + run_sort_pushdown_inexact + ;; sort_tpch) run_sort_tpch "1" ;; @@ -1137,6 +1143,77 @@ run_sort_pushdown_sorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${SORT_PUSHDOWN_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } +# Generates data for sort pushdown Inexact benchmark. +# +# Unlike sort_pushdown (Exact path), this benchmark targets the Inexact path +# where row group reorder by statistics is beneficial. It produces a single +# large lineitem parquet file where row groups have NON-OVERLAPPING but +# OUT-OF-ORDER l_orderkey ranges (each RG internally sorted, RGs shuffled). +# +# This simulates append-heavy workloads where data is written in batches at +# different times, producing segments with tight value ranges but in arbitrary +# row-group order. +data_sort_pushdown_inexact() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" + if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then + echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}" + return + fi + + echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." + + # Re-use the sort_pushdown data as the source (generate if missing) + data_sort_pushdown + + mkdir -p "${INEXACT_DIR}" + SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" + + # Use datafusion-cli to: + # 1. Read the 3 source files as one table + # 2. Shuffle row order by hashing l_orderkey (produces deterministic but + # non-sorted output — each RG will cover a scattered subset of orderkeys + # with no overlap between RGs once row_group_size is small) + # 3. Write a single parquet file with small max_row_group_size so we get + # many RGs per file. + # + # Note: with pure hash-shuffle, each RG still covers the full orderkey range. + # To get tight RGs with non-overlapping ranges we bucket by hash first then + # sort within bucket. + (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " + CREATE EXTERNAL TABLE src + STORED AS PARQUET + LOCATION '${SRC_DIR}'; + + -- Bucket rows: split orderkey range into 64 chunks, scramble chunk + -- order, but sort within each chunk. This produces ~64 RG-sized + -- segments where each segment has a tight orderkey range but the + -- segments appear in scrambled (non-sorted) order in the file. + -- We use (l_orderkey * 1664525 + 1013904223) % 64 as a deterministic + -- scrambler (linear congruential generator) so bucket order is + -- effectively random but reproducible. + COPY ( + SELECT * FROM src + ORDER BY + (l_orderkey * 1664525 + 1013904223) % 64, + l_orderkey + ) + TO '${INEXACT_DIR}/shuffled.parquet' + STORED AS PARQUET + OPTIONS ('format.max_row_group_size' '100000'); + ") + + echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}" + ls -la "${INEXACT_DIR}" +} + +# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). +run_sort_pushdown_inexact() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" + echo "Running sort pushdown Inexact benchmark (row group reorder by statistics)..." + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + # Runs the sort integration benchmark run_sort_tpch() { SCALE_FACTOR=$1 diff --git a/benchmarks/queries/sort_pushdown_inexact/q1.sql b/benchmarks/queries/sort_pushdown_inexact/q1.sql new file mode 100644 index 0000000000000..d772bc486a12b --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q1.sql @@ -0,0 +1,8 @@ +-- Inexact path: TopK + DESC LIMIT on ASC-declared file. +-- With RG reorder, the first RG read contains the highest max value, +-- so TopK's threshold tightens quickly and subsequent RGs get filtered +-- efficiently via dynamic filter pushdown. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q2.sql b/benchmarks/queries/sort_pushdown_inexact/q2.sql new file mode 100644 index 0000000000000..6e2bef44fc37e --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q2.sql @@ -0,0 +1,7 @@ +-- Inexact path: TopK + DESC LIMIT with larger fetch (1000). +-- Larger LIMIT means more row_replacements; RG reorder reduces the +-- total replacement count by tightening the threshold faster. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact/q3.sql b/benchmarks/queries/sort_pushdown_inexact/q3.sql new file mode 100644 index 0000000000000..d858ec79a67c9 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q3.sql @@ -0,0 +1,8 @@ +-- Inexact path: wide projection (all columns) + DESC LIMIT. +-- Shows the row-level filter benefit: with a tight threshold from the +-- first RG, subsequent RGs skip decoding non-sort columns for filtered +-- rows — bigger wins for wide tables. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q4.sql b/benchmarks/queries/sort_pushdown_inexact/q4.sql new file mode 100644 index 0000000000000..bd2efc5d3b992 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q4.sql @@ -0,0 +1,7 @@ +-- Inexact path: wide projection + DESC LIMIT with larger fetch. +-- Combines wide-row row-level filter benefit with larger LIMIT to +-- demonstrate cumulative gains from RG reorder. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/src/sort_pushdown.rs b/benchmarks/src/sort_pushdown.rs index e7fce1921e7a8..e2a4615a3ef39 100644 --- a/benchmarks/src/sort_pushdown.rs +++ b/benchmarks/src/sort_pushdown.rs @@ -159,7 +159,14 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let sql = self.load_query(query_id)?; - let config = self.common.config()?; + let mut config = self.common.config()?; + // Enable parquet filter pushdown + late materialization. This is + // essential for the Inexact sort pushdown path: TopK's dynamic + // filter is pushed to the parquet reader, so only sort-column + // rows pass the filter's Decode non-sort columns are skipped for + // rows that don't pass the filter — this is where RG reorder's + // tight-threshold-first strategy pays off for wide-row queries. + config.options_mut().execution.parquet.pushdown_filters = true; let rt = self.common.build_runtime()?; let state = SessionStateBuilder::new() .with_config(config) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 0cbae848a974b..75cac1d2c1e15 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1231,18 +1231,19 @@ impl RowGroupsPrunedParquetOpen { let mut prepared_plan = access_plan.prepare(rg_metadata)?; // Reorder row groups by statistics if sort order is known. - // This helps TopK queries find optimal values first. + // This helps TopK queries find optimal values first by placing + // row groups with optimal min/max values at the front. + // When reorder is active, skip reverse — reorder already encodes + // the direction (uses min for ASC, max for DESC). if let Some(sort_order) = &prepared.sort_order_for_reorder { prepared_plan = prepared_plan.reorder_by_statistics( sort_order, file_metadata.as_ref(), &prepared.physical_file_schema, )?; - } - - // Potentially reverse the access plan for performance. - // See `ParquetSource::try_pushdown_sort` for the rationale. - if prepared.reverse_row_groups { + } else if prepared.reverse_row_groups { + // Fallback: simple reverse when no sort order statistics available. + // See `ParquetSource::try_pushdown_sort` for the rationale. prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; }