diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 8306d4b54c256..790674d67f84e 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -20,7 +20,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use std::sync::Arc; use crate::join_key_set::JoinKeySet; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{NullEquality, Result}; use datafusion_expr::expr::{BinaryExpr, Expr}; use datafusion_expr::logical_plan::{ @@ -85,6 +85,17 @@ impl OptimizerRule for EliminateCrossJoin { plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { + // Fast path: nothing to do if the plan contains no `Join` nodes. + // Without this guard the rule still falls through to + // `rewrite_children`, which walks the entire plan, processes + // uncorrelated subqueries, and rewrites every direct child via + // `map_children` (clone-on-write) — paid by every query in the + // logical optimizer pipeline. Same shape as the + // `plan_has_subqueries` fast-path landed in #22298. + if !plan_has_joins(&plan) { + return Ok(Transformed::no(plan)); + } + let plan_schema = Arc::clone(plan.schema()); let mut possible_join_keys = JoinKeySet::new(); let mut all_inputs: Vec = vec![]; @@ -207,6 +218,62 @@ impl OptimizerRule for EliminateCrossJoin { } } +/// Returns `true` if `plan` contains at least one [`LogicalPlan::Join`] +/// node, either directly in its tree *or* inside an embedded subquery +/// plan reachable through `Expr::ScalarSubquery` / `Expr::InSubquery` +/// / `Expr::Exists` / `Expr::SetComparison`. +/// +/// Used as a fast-path gate at the top of [`EliminateCrossJoin::rewrite`] +/// so that join-free plans skip the full recursive rewrite. The +/// expression-side recursion is required because `rewrite_children` +/// also dives into uncorrelated subqueries via +/// `map_uncorrelated_subqueries`; ignoring them here would skip +/// optimizing a `CROSS JOIN` that sits only inside an +/// `IN (SELECT ... FROM a, b)`-style predicate. +/// +/// Read-only `apply` walk with early stop on first match, no +/// allocations. +fn plan_has_joins(plan: &LogicalPlan) -> bool { + let mut found = false; + let _ = plan.apply(|node| { + if matches!(node, LogicalPlan::Join(_)) { + found = true; + return Ok(TreeNodeRecursion::Stop); + } + // Recurse into subquery plans referenced from this node's + // expressions. `LogicalPlan::apply` does not descend into the + // subquery plan, so we have to do it explicitly. + let _ = node.apply_expressions(|expr| { + if found { + return Ok(TreeNodeRecursion::Stop); + } + let _ = expr.apply(|e| { + let subquery_plan = match e { + Expr::ScalarSubquery(sq) => Some(sq.subquery.as_ref()), + Expr::InSubquery(in_sq) => Some(in_sq.subquery.subquery.as_ref()), + Expr::Exists(exists) => Some(exists.subquery.subquery.as_ref()), + Expr::SetComparison(sc) => Some(sc.subquery.subquery.as_ref()), + _ => None, + }; + if let Some(sub) = subquery_plan + && plan_has_joins(sub) + { + found = true; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + }); + Ok(TreeNodeRecursion::Continue) + }); + if found { + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + }); + found +} + fn rewrite_children( optimizer: &impl OptimizerRule, plan: LogicalPlan, @@ -1418,4 +1485,102 @@ mod tests { Ok(()) } + + // ---------------- fast-path tests ---------------- + + /// `plan_has_joins` detects a `Join` at the root of the plan. + #[test] + fn plan_has_joins_detects_root_join() -> Result<()> { + let plan = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?) + .cross_join(test_table_scan_with_name("t2")?)? + .build()?; + assert!(plan_has_joins(&plan)); + Ok(()) + } + + /// `plan_has_joins` detects a `Join` nested under other operators. + #[test] + fn plan_has_joins_detects_nested_join() -> Result<()> { + let plan = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?) + .cross_join(test_table_scan_with_name("t2")?)? + .filter(col("t1.a").eq(col("t2.a")))? + .project(vec![col("t1.a")])? + .build()?; + assert!(plan_has_joins(&plan)); + Ok(()) + } + + /// Join-free plans return `false` so the fast-path in `rewrite` can + /// bail out before doing any recursion. + #[test] + fn plan_has_joins_returns_false_for_join_free_plan() -> Result<()> { + let plan = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?) + .filter(col("a").gt(lit(0_i32)))? + .project(vec![col("a"), col("b")])? + .build()?; + assert!(!plan_has_joins(&plan)); + Ok(()) + } + + /// `plan_has_joins` walks into embedded subquery plans — e.g. an + /// outer `Filter` whose predicate is `IN (SELECT ... FROM a, b)` + /// where the inner plan contains a `CROSS JOIN`. Without this the + /// fast-path would silently skip optimizing joins-in-subqueries + /// because `LogicalPlan::apply` doesn't descend into subquery + /// plan trees. + #[test] + fn plan_has_joins_detects_join_inside_subquery() -> Result<()> { + use datafusion_expr::in_subquery; + + // Subquery plan that itself contains a join. + let subquery_plan = + LogicalPlanBuilder::from(test_table_scan_with_name("sub_t1")?) + .cross_join(test_table_scan_with_name("sub_t2")?)? + .project(vec![col("sub_t1.a")])? + .build()?; + + // Outer plan with NO direct Join — only the IN subquery reaches one. + let outer = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?) + .filter(in_subquery(col("a"), Arc::new(subquery_plan)))? + .project(vec![col("a")])? + .build()?; + + assert!( + plan_has_joins(&outer), + "plan_has_joins must descend into subquery plans" + ); + Ok(()) + } + + /// `EliminateCrossJoin::rewrite` short-circuits on join-free plans: + /// no recursion into `rewrite_children`, no `Transformed::yes`, + /// the plan comes back identical. + #[test] + fn rewrite_short_circuits_when_plan_has_no_joins() -> Result<()> { + let plan = LogicalPlanBuilder::from(test_table_scan_with_name("t1")?) + .filter(col("a").gt(lit(0_i32)))? + .project(vec![col("a"), col("b")])? + .build()?; + + let starting_display = plan.display_indent_schema().to_string(); + let starting_schema = Arc::clone(plan.schema()); + + let rule = EliminateCrossJoin::new(); + let Transformed { + transformed, + data: optimized_plan, + .. + } = rule.rewrite(plan, &OptimizerContext::new())?; + + assert!( + !transformed, + "join-free plan should not be marked as transformed" + ); + assert_eq!(&starting_schema, optimized_plan.schema()); + assert_eq!( + starting_display, + optimized_plan.display_indent_schema().to_string() + ); + Ok(()) + } }