Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 166 additions & 1 deletion datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{OptimizerConfig, OptimizerRule};
use std::sync::Arc;

use crate::join_key_set::JoinKeySet;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{NullEquality, Result};
use datafusion_expr::expr::{BinaryExpr, Expr};
use datafusion_expr::logical_plan::{
Expand Down Expand Up @@ -85,6 +85,17 @@ impl OptimizerRule for EliminateCrossJoin {
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
// Fast path: nothing to do if the plan contains no `Join` nodes.
// Without this guard the rule still falls through to
// `rewrite_children`, which walks the entire plan, processes
// uncorrelated subqueries, and rewrites every direct child via
// `map_children` (clone-on-write) — paid by every query in the
// logical optimizer pipeline. Same shape as the
// `plan_has_subqueries` fast-path landed in #22298.
if !plan_has_joins(&plan) {
return Ok(Transformed::no(plan));
}

let plan_schema = Arc::clone(plan.schema());
let mut possible_join_keys = JoinKeySet::new();
let mut all_inputs: Vec<LogicalPlan> = vec![];
Expand Down Expand Up @@ -207,6 +218,62 @@ impl OptimizerRule for EliminateCrossJoin {
}
}

/// Returns `true` if `plan` contains at least one [`LogicalPlan::Join`]
/// node, either directly in its tree *or* inside an embedded subquery
/// plan reachable through `Expr::ScalarSubquery` / `Expr::InSubquery`
/// / `Expr::Exists` / `Expr::SetComparison`.
///
/// Used as a fast-path gate at the top of [`EliminateCrossJoin::rewrite`]
/// so that join-free plans skip the full recursive rewrite. The
/// expression-side recursion is required because `rewrite_children`
/// also dives into uncorrelated subqueries via
/// `map_uncorrelated_subqueries`; ignoring them here would skip
/// optimizing a `CROSS JOIN` that sits only inside an
/// `IN (SELECT ... FROM a, b)`-style predicate.
///
/// Read-only `apply` walk with early stop on first match, no
/// allocations.
fn plan_has_joins(plan: &LogicalPlan) -> bool {
let mut found = false;
let _ = plan.apply(|node| {
if matches!(node, LogicalPlan::Join(_)) {
found = true;
return Ok(TreeNodeRecursion::Stop);
}
// Recurse into subquery plans referenced from this node's
// expressions. `LogicalPlan::apply` does not descend into the
// subquery plan, so we have to do it explicitly.
let _ = node.apply_expressions(|expr| {
if found {
return Ok(TreeNodeRecursion::Stop);
}
let _ = expr.apply(|e| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised there isn'y already soem tranversal code that recurses into subqueries

Perhaps this one?
https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html#method.apply_with_subqueries

(though maybe you can't use that because it is for LogicalPlans not Exprs 🤔 )

let subquery_plan = match e {
Expr::ScalarSubquery(sq) => Some(sq.subquery.as_ref()),
Expr::InSubquery(in_sq) => Some(in_sq.subquery.subquery.as_ref()),
Expr::Exists(exists) => Some(exists.subquery.subquery.as_ref()),
Expr::SetComparison(sc) => Some(sc.subquery.subquery.as_ref()),
_ => None,
};
if let Some(sub) = subquery_plan
&& plan_has_joins(sub)
{
found = true;
return Ok(TreeNodeRecursion::Stop);
}
Ok(TreeNodeRecursion::Continue)
});
Ok(TreeNodeRecursion::Continue)
});
if found {
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
});
found
}
Comment thread
zhuqi-lucas marked this conversation as resolved.

fn rewrite_children(
optimizer: &impl OptimizerRule,
plan: LogicalPlan,
Expand Down Expand Up @@ -1418,4 +1485,102 @@ mod tests {

Ok(())
}

// ---------------- fast-path tests ----------------

/// `plan_has_joins` detects a `Join` at the root of the plan.
#[test]
fn plan_has_joins_detects_root_join() -> Result<()> {
let plan = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?)
.cross_join(test_table_scan_with_name("t2")?)?
.build()?;
assert!(plan_has_joins(&plan));
Ok(())
}

/// `plan_has_joins` detects a `Join` nested under other operators.
#[test]
fn plan_has_joins_detects_nested_join() -> Result<()> {
let plan = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?)
.cross_join(test_table_scan_with_name("t2")?)?
.filter(col("t1.a").eq(col("t2.a")))?
.project(vec![col("t1.a")])?
.build()?;
assert!(plan_has_joins(&plan));
Ok(())
}

/// Join-free plans return `false` so the fast-path in `rewrite` can
/// bail out before doing any recursion.
#[test]
fn plan_has_joins_returns_false_for_join_free_plan() -> Result<()> {
let plan = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?)
.filter(col("a").gt(lit(0_i32)))?
.project(vec![col("a"), col("b")])?
.build()?;
assert!(!plan_has_joins(&plan));
Ok(())
}

/// `plan_has_joins` walks into embedded subquery plans — e.g. an
/// outer `Filter` whose predicate is `IN (SELECT ... FROM a, b)`
/// where the inner plan contains a `CROSS JOIN`. Without this the
/// fast-path would silently skip optimizing joins-in-subqueries
/// because `LogicalPlan::apply` doesn't descend into subquery
/// plan trees.
#[test]
fn plan_has_joins_detects_join_inside_subquery() -> Result<()> {
use datafusion_expr::in_subquery;

// Subquery plan that itself contains a join.
let subquery_plan =
LogicalPlanBuilder::from(test_table_scan_with_name("sub_t1")?)
.cross_join(test_table_scan_with_name("sub_t2")?)?
.project(vec![col("sub_t1.a")])?
.build()?;

// Outer plan with NO direct Join — only the IN subquery reaches one.
let outer = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?)
.filter(in_subquery(col("a"), Arc::new(subquery_plan)))?
.project(vec![col("a")])?
.build()?;

assert!(
plan_has_joins(&outer),
"plan_has_joins must descend into subquery plans"
);
Ok(())
}

/// `EliminateCrossJoin::rewrite` short-circuits on join-free plans:
/// no recursion into `rewrite_children`, no `Transformed::yes`,
/// the plan comes back identical.
#[test]
fn rewrite_short_circuits_when_plan_has_no_joins() -> Result<()> {
let plan = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?)
.filter(col("a").gt(lit(0_i32)))?
.project(vec![col("a"), col("b")])?
.build()?;

let starting_display = plan.display_indent_schema().to_string();
let starting_schema = Arc::clone(plan.schema());

let rule = EliminateCrossJoin::new();
let Transformed {
transformed,
data: optimized_plan,
..
} = rule.rewrite(plan, &OptimizerContext::new())?;

assert!(
!transformed,
"join-free plan should not be marked as transformed"
);
assert_eq!(&starting_schema, optimized_plan.schema());
assert_eq!(
starting_display,
optimized_plan.display_indent_schema().to_string()
);
Ok(())
}
}
Comment thread
zhuqi-lucas marked this conversation as resolved.
Loading