Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/breaking_changes_detector_comment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
\`\`\`

</details>"

# Use --raw-field (not --field): always sends the value as a literal string. while --field would treat a leading `@` as a file to read
# (even though the body does not start with user input we are being cautious)
if [ -n "$COMMENT_ID" ]; then
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,12 @@ config_namespace! {
/// into the file scan phase.
pub enable_topk_dynamic_filter_pushdown: bool, default = true

/// When set to true, the physical optimizer will try to evaluate
/// simple ORDER BY ... LIMIT queries over a narrow key-only input first,
/// carrying hidden row numbers and materializing the full rows only
/// after the TopK has been computed.
pub enable_row_number_topk_late_materialization: bool, default = true

/// When set to true, the optimizer will attempt to push down Join dynamic filters
/// into the file scan phase.
pub enable_join_dynamic_filter_pushdown: bool, default = true
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/optimizer_rule_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ in multiple phases.
| 15 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
| 16 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
| 17 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
| 18 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
| 19 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
| 20 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
| 21 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
| 18 | `LateMaterialization` | - | Rewrites simple TopK plans to sort a narrow key-only input before materializing full rows by row number. |
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
40 changes: 39 additions & 1 deletion datafusion/core/tests/parquet/external_access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use datafusion_expr::{Expr, col, lit};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::metrics::{MetricValue, MetricsSet};

use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::{FileRowsSelection, file_scan_config::FileScanConfigBuilder};
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::properties::WriterProperties;
Expand Down Expand Up @@ -128,6 +128,35 @@ async fn selection_scan() {
}
}

#[tokio::test]
async fn file_rows_selection() {
TestFull {
access_plan: None,
file_rows_selection: Some(FileRowsSelection::new(vec![1, 5, 6, 9])),
expected_rows: 4,
predicate: None,
}
.run()
.await
.unwrap();
}

#[tokio::test]
async fn file_rows_selection_intersects_access_plan() {
TestFull {
access_plan: Some(ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Skip,
])),
file_rows_selection: Some(FileRowsSelection::new(vec![1, 5, 6, 9])),
expected_rows: 1,
predicate: None,
}
.run()
.await
.unwrap();
}

#[tokio::test]
async fn skip_scan() {
let plans = vec![
Expand Down Expand Up @@ -170,6 +199,7 @@ async fn plan_and_filter() {
// initial
let parquet_metrics = TestFull {
access_plan,
file_rows_selection: None,
expected_rows: 0,
predicate: Some(predicate),
}
Expand Down Expand Up @@ -227,6 +257,7 @@ async fn bad_row_groups() {
RowGroupAccess::Skip,
RowGroupAccess::Scan,
])),
file_rows_selection: None,
expected_rows: 0,
predicate: None,
}
Expand All @@ -249,6 +280,7 @@ async fn bad_selection() {
])),
RowGroupAccess::Skip,
])),
file_rows_selection: None,
// expects that we hit an error, this should not be run
expected_rows: 10000,
predicate: None,
Expand Down Expand Up @@ -300,6 +332,7 @@ impl Test {
} = self;
TestFull {
access_plan,
file_rows_selection: None,
expected_rows,
predicate: None,
}
Expand All @@ -317,6 +350,7 @@ impl Test {
/// 4. Returns the statistics from running the plan
struct TestFull {
access_plan: Option<ParquetAccessPlan>,
file_rows_selection: Option<FileRowsSelection>,
expected_rows: usize,
predicate: Option<Expr>,
}
Expand All @@ -327,6 +361,7 @@ impl TestFull {

let Self {
access_plan,
file_rows_selection,
expected_rows,
predicate,
} = self;
Expand All @@ -351,6 +386,9 @@ impl TestFull {
if let Some(access_plan) = access_plan {
partitioned_file = partitioned_file.with_extension(access_plan);
}
if let Some(file_rows_selection) = file_rows_selection {
partitioned_file = partitioned_file.with_extension(file_rows_selection);
}

// Create a DataSourceExec to read the file
let object_store_url = ObjectStoreUrl::local_filesystem();
Expand Down
Loading
Loading