fix: impl handle_child_pushdown_result for SortExec#21527
fix: impl handle_child_pushdown_result for SortExec#21527haohuaijin wants to merge 7 commits intoapache:mainfrom
handle_child_pushdown_result for SortExec#21527Conversation
|
the reproduce result in #21526, for this pr |
handle_child_pushdown_result for SortExec
|
|
||
| /// FilterExec above a plain SortExec (no fetch) should be pushed below it. | ||
| /// The scan supports pushdown, so the filter lands in the DataSourceExec. | ||
| #[test] |
There was a problem hiding this comment.
Can we add these as SLT tests instead?
There was a problem hiding this comment.
due to
the subquery's order by is removed by our sql parser layer, so i can't reproduce use sql
| // Only absorb filters in Pre phase for a plain sort (no fetch). | ||
| // A sort with fetch (TopK) must not accept filters: reordering | ||
| // filter vs. limit would change semantics. | ||
| if phase != FilterPushdownPhase::Pre || self.fetch.is_some() { |
There was a problem hiding this comment.
Could you explain why FilterPushdownPhase::Post should not be allowed, maybe with examples of where it would be incorrect?
There was a problem hiding this comment.
Good question, my initial though was that the swap of sort and filter is unrelated to dynamic filters, so I only do swap for pre pahse. but today when i reconsider this part, i find one incorrect example. i added this example in 10f1462
if i remove the phase != FilterPushdownPhase::Pre, the result will like below, we insert the dynamic filter under the sort, before the datasource if datasource do not accept dynamic filter,
insta::assert_snapshot!(
OptimizationTest::new(
Arc::clone(&plan),
FilterPushdown::new_post_optimization(),
true
),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=false
output:
Ok:
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
- FilterExec: DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=false
"
);| // Build a single conjunctive predicate from the unsupported filters | ||
| // and insert a FilterExec between this SortExec and its child. | ||
| let predicate = datafusion_physical_expr::conjunction(unsupported_filters); | ||
| let new_child = | ||
| Arc::new(FilterExec::try_new(predicate, Arc::clone(self.input()))?) | ||
| as Arc<dyn ExecutionPlan>; | ||
| let new_sort = Arc::new( | ||
| SortExec::new(self.expr.clone(), new_child) | ||
| .with_fetch(self.fetch()) | ||
| .with_preserve_partitioning(self.preserve_partitioning()), | ||
| ) as Arc<dyn ExecutionPlan>; | ||
|
|
||
| Ok(FilterPushdownPropagation { | ||
| filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()], | ||
| updated_node: Some(new_sort), | ||
| }) |
There was a problem hiding this comment.
Swapping the order from FilterExec -> SortExec to SortExec -> FilterExec makes sense to me 👍🏻
980d94b to
0652ed8
Compare
|
Thanks for your reviews @adriangb , i replyed for your comment. |
Which issue does this PR close?
Rationale for this change
FilterPushdowndoes not push filters throughSortExecbecauseSortExeclacks an implementation ofhandle_child_pushdown_result. When aFilterExecsits above a plainSortExec(no fetch), the filter can safely be moved below the sort without changing semantics, since sorting preserves all rows.What changes are included in this PR?
Implemented
handle_child_pushdown_resultforSortExecindatafusion/physical-plan/src/sorts/sort.rs:SortExec(no fetch) in thePrephase: any filters not absorbed by the child are collected into a newFilterExecinserted between theSortExecand its child.SortExecwith fetch (TopK) or non-Prephases: filters are not absorbed, preserving correct TopK semantics where filtering after limiting would change results.Are these changes tested?
Added comprehensive tests in
datafusion/core/tests/physical_optimizer/filter_pushdown.rscovering:preserve_partitioningflagand one test case
test_filter_pushdown_through_sort_with_projectionfor use LogicalPlanBuilder to reproduce.not able to use sql to reproduce due to #15886
Are there any user-facing changes?