Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions datafusion/optimizer/src/extract_leaf_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,13 @@ fn try_push_into_inputs(
return Ok(None);
}

// Unnest may output a column with the same name but different value/type
// than its input column. Name-based routing cannot distinguish those.
// On top of that Unnest can't go through the `node.with_new_exprs(node.expressions(), new_inputs)` rebuild
if matches!(node, LogicalPlan::Unnest(_)) {
return Ok(None);
}

// SubqueryAlias remaps qualifiers between input and output.
// Rewrite pairs/columns from alias-space to input-space before routing.
let remapped = if let LogicalPlan::SubqueryAlias(sa) = node {
Expand Down Expand Up @@ -3035,4 +3042,48 @@ mod tests {

Ok(())
}

/// Regression test for the `Assertion failed: expr.is_empty(): Unnest`
/// internal error.
///
/// `try_push_into_inputs` rebuilds the parent node via
/// `node.with_new_exprs(node.expressions(), new_inputs)`. For `Unnest`,
/// `apply_expressions` exposes the `exec_columns` as `Expr::Column`s
/// (so `expressions()` is **non-empty**), but `with_new_exprs` for
/// `Unnest` immediately calls `assert_no_expressions(expr)?` and errors
/// out. The optimizer should treat `Unnest` as a barrier and bail
/// instead of attempting to push through it.
#[test]
fn test_no_push_through_unnest() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};

let schema = Schema::new(vec![
Field::new("list_col", DataType::new_list(DataType::Int32, true), true),
Field::new("other_col", DataType::Int32, true),
]);
let table_scan =
datafusion_expr::logical_plan::table_scan(Some("t"), &schema, None)?
.build()?;
let plan = LogicalPlanBuilder::from(table_scan)
.unnest_column("list_col")?
.filter(leaf_udf(col("list_col"), "x").eq(lit(1i32)))?
.build()?;

let ctx = OptimizerContext::new().with_max_passes(1);
let optimizer = Optimizer::with_rules(vec![
Arc::new(ExtractLeafExpressions::new()),
Arc::new(PushDownLeafProjections::new()),
]);
let optimized = optimizer.optimize(plan, &ctx, |_, _| {})?;

insta::assert_snapshot!(format!("{optimized}"), @r#"
Projection: list_col, t.other_col
Filter: __datafusion_extracted_1 = Int32(1)
Projection: leaf_udf(list_col, Utf8("x")) AS __datafusion_extracted_1, list_col, t.other_col
Unnest: lists[t.list_col|depth=1] structs[]
TableScan: t
"#);

Ok(())
}
}
44 changes: 44 additions & 0 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1419,3 +1419,47 @@ FROM (

statement ok
DROP TABLE unused_unnest_pruning;

## Regression: pushing a leaf-extracted projection (containing get_field,
## which has MoveTowardsLeafNodes placement) through an `Unnest` used to
## trip `Assertion failed: expr.is_empty(): Unnest` inside
## `PushDownLeafProjections`. The optimizer must not try to pushdown these
## projections through an `Unnest` and should produce a valid plan.

statement ok
CREATE TABLE struct_and_list_table
AS VALUES
(struct(1, 2), [10, 20, 30]),
(struct(3, 4), [40, 50]);

query I
SELECT sum(get_field(s, 'c0'))
FROM (SELECT s, unnest(arr)
FROM (SELECT column1 AS s, column2 AS arr
FROM struct_and_list_table));
----
9

statement ok
DROP TABLE struct_and_list_table;

## Regression: get_field directly references the struct produced by unnest.
## This covers the case where the leaf-extracted expression depends on the
## unnested column itself rather than a sibling input column below the Unnest.

statement ok
CREATE TABLE list_struct_table
AS VALUES
([struct(1, 'a'), struct(2, 'b')]),
([struct(3, 'c')]);

query IT
SELECT get_field(unnest(column1), 'c0'), get_field(unnest(column1), 'c1')
FROM list_struct_table;
----
1 a
2 b
3 c

statement ok
DROP TABLE list_struct_table;