diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 85361ef5e17e1..9cfd4106310c6 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1131,6 +1131,11 @@ config_namespace! { /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. pub enable_dynamic_filter_pushdown: bool, default = true + /// When set to true, the pluggable `ExpressionAnalyzerRegistry` from + /// `SessionState` is used for expression-level statistics estimation + /// (NDV, selectivity, min/max, null fraction) in physical plan operators. + pub use_expression_analyzer: bool, default = false + /// When set to true, the optimizer will insert filters before a join between /// a nullable and non-nullable column to filter out nulls on the nullable side. This /// filter can add additional overhead when the file format does not fully support @@ -1263,9 +1268,11 @@ config_namespace! { pub join_reordering: bool, default = true /// When set to true, the physical plan optimizer uses the pluggable - /// `StatisticsRegistry` for statistics propagation across operators. - /// This enables more accurate cardinality estimates compared to each - /// operator's built-in `partition_statistics`. + /// `StatisticsRegistry` for a bottom-up statistics walk across operators, + /// enabling more accurate cardinality estimates. Enabling + /// `use_expression_analyzer` alongside this flag gives built-in + /// providers access to custom expression-level analyzers (NDV, + /// selectivity) for the operators they process. pub use_statistics_registry: bool, default = false /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index a5749e70ceaaa..606550a4f9c93 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -67,6 +67,7 @@ use datafusion_optimizer::{ Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule, }; use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::PhysicalOptimizerContext; use datafusion_physical_optimizer::PhysicalOptimizerRule; @@ -167,6 +168,8 @@ pub struct SessionState { extension_types: ExtensionTypeRegistryRef, /// Deserializer registry for extensions. serializer_registry: Arc, + /// Registry for expression-level statistics analyzers (NDV, selectivity, etc.) + expression_analyzer_registry: Arc, /// Holds registered external FileFormat implementations file_formats: HashMap>, /// Session configuration @@ -212,6 +215,13 @@ impl PhysicalOptimizerContext for SessionState { fn statistics_registry(&self) -> Option<&StatisticsRegistry> { self.statistics_registry.as_ref() } + + fn expression_analyzer_registry(&self) -> Option<&Arc> { + self.config_options() + .optimizer + .use_expression_analyzer + .then_some(&self.expression_analyzer_registry) + } } impl Debug for SessionState { @@ -225,6 +235,10 @@ impl Debug for SessionState { .field("runtime_env", &self.runtime_env) .field("catalog_list", &self.catalog_list) .field("serializer_registry", &self.serializer_registry) + .field( + "expression_analyzer_registry", + &self.expression_analyzer_registry, + ) .field("file_formats", &self.file_formats) .field("execution_props", &self.execution_props) .field("table_options", &self.table_options) @@ -944,6 +958,11 @@ impl SessionState { &self.serializer_registry } + /// Return the [`ExpressionAnalyzerRegistry`] for expression-level statistics + pub fn expression_analyzer_registry(&self) -> &Arc { + &self.expression_analyzer_registry + } + /// Return version of the cargo package that produced this query pub fn version(&self) -> &str { env!("CARGO_PKG_VERSION") @@ -1024,6 +1043,7 @@ pub struct SessionStateBuilder { window_functions: Option>>, extension_types: Option, serializer_registry: Option>, + expression_analyzer_registry: Option>, file_formats: Option>>, config: Option, table_options: Option, @@ -1066,6 +1086,7 @@ impl SessionStateBuilder { window_functions: None, extension_types: None, serializer_registry: None, + expression_analyzer_registry: None, file_formats: None, table_options: None, config: None, @@ -1123,6 +1144,7 @@ impl SessionStateBuilder { window_functions: Some(existing.window_functions.into_values().collect_vec()), extension_types: Some(existing.extension_types), serializer_registry: Some(existing.serializer_registry), + expression_analyzer_registry: Some(existing.expression_analyzer_registry), file_formats: Some(existing.file_formats.into_values().collect_vec()), config: Some(new_config), table_options: Some(existing.table_options), @@ -1381,6 +1403,15 @@ impl SessionStateBuilder { self } + /// Set the [`ExpressionAnalyzerRegistry`] for expression-level statistics + pub fn with_expression_analyzer_registry( + mut self, + expression_analyzer_registry: Arc, + ) -> Self { + self.expression_analyzer_registry = Some(expression_analyzer_registry); + self + } + /// Set the map of [`FileFormatFactory`]s pub fn with_file_formats( mut self, @@ -1522,6 +1553,7 @@ impl SessionStateBuilder { window_functions, extension_types, serializer_registry, + expression_analyzer_registry, file_formats, table_options, config, @@ -1561,6 +1593,8 @@ impl SessionStateBuilder { extension_types: Arc::new(MemoryExtensionTypeRegistry::default()), serializer_registry: serializer_registry .unwrap_or_else(|| Arc::new(EmptySerializerRegistry)), + expression_analyzer_registry: expression_analyzer_registry + .unwrap_or_else(|| Arc::new(ExpressionAnalyzerRegistry::new())), file_formats: HashMap::new(), table_options: table_options.unwrap_or_else(|| { TableOptions::default_from_session_config(config.options()) @@ -1748,6 +1782,13 @@ impl SessionStateBuilder { &mut self.serializer_registry } + /// Returns the current expression_analyzer_registry value + pub fn expression_analyzer_registry( + &mut self, + ) -> &mut Option> { + &mut self.expression_analyzer_registry + } + /// Returns the current file_formats value pub fn file_formats(&mut self) -> &mut Option>> { &mut self.file_formats @@ -1823,6 +1864,10 @@ impl Debug for SessionStateBuilder { .field("runtime_env", &self.runtime_env) .field("catalog_list", &self.catalog_list) .field("serializer_registry", &self.serializer_registry) + .field( + "expression_analyzer_registry", + &self.expression_analyzer_registry, + ) .field("file_formats", &self.file_formats) .field("execution_props", &self.execution_props) .field("table_options", &self.table_options) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a025446aa37e8..12d9a79688c82 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -56,6 +56,7 @@ use crate::physical_plan::{ WindowExpr, displayable, windows, }; use crate::schema_equivalence::schema_satisfied_by; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use arrow::array::{RecordBatch, builder::StringBuilder}; use arrow::compute::SortOptions; @@ -1097,12 +1098,12 @@ impl DefaultPhysicalPlanner { input_schema.as_arrow(), )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { - FilterExecBuilder::new( + let builder = FilterExecBuilder::new( Arc::clone(&runtime_expr[0]), physical_input, ) - .with_batch_size(session_state.config().batch_size()) - .build()? + .with_batch_size(session_state.config().batch_size()); + builder.build()? } PlanAsyncExpr::Async( async_map, @@ -1645,7 +1646,7 @@ impl DefaultPhysicalPlanner { { // Use SortMergeJoin if hash join is not preferred let join_on_len = join_on.len(); - Arc::new(SortMergeJoinExec::try_new( + let exec = SortMergeJoinExec::try_new( physical_left, physical_right, join_on, @@ -1653,7 +1654,8 @@ impl DefaultPhysicalPlanner { *join_type, vec![SortOptions::default(); join_on_len], *null_equality, - )?) + )?; + Arc::new(exec) } else if session_state.config().target_partitions() > 1 && session_state.config().repartition_joins() && prefer_hash_join @@ -2761,15 +2763,36 @@ impl DefaultPhysicalPlanner { // to verify that the plan fulfills the base requirements. InvariantChecker(InvariantLevel::Always).check(&plan)?; + let use_expression_analyzer = session_state + .config_options() + .optimizer + .use_expression_analyzer; let mut new_plan = Arc::clone(&plan); + if use_expression_analyzer { + new_plan = Self::inject_expression_analyzer( + new_plan, + session_state.expression_analyzer_registry(), + )?; + } for optimizer in optimizers { let before_schema = new_plan.schema(); + let plan_before_rule = Arc::clone(&new_plan); new_plan = optimizer .optimize_with_context(new_plan, session_state) .map_err(|e| { DataFusionError::Context(optimizer.name().to_string(), Box::new(e)) })?; + // Re-inject ExpressionAnalyzer registry into any exec nodes created or replaced by + // this rule. Skip if the rule returned the same plan unchanged to + // avoid an O(nodes) walk for no-op rules. + if use_expression_analyzer && !Arc::ptr_eq(&plan_before_rule, &new_plan) { + new_plan = Self::inject_expression_analyzer( + new_plan, + session_state.expression_analyzer_registry(), + )?; + } + // This only checks the schema in release build, and performs additional checks in debug mode. OptimizationInvariantChecker::new(optimizer) .check(&new_plan, &before_schema)?; @@ -2839,6 +2862,24 @@ impl DefaultPhysicalPlanner { Ok(mem_exec) } + /// Walks `plan` and injects `registry` into every exec node that accepts it, + /// skipping nodes that already have a registry set. + fn inject_expression_analyzer( + plan: Arc, + registry: &Arc, + ) -> Result> { + use datafusion_common::tree_node::{Transformed, TreeNode}; + + plan.transform_up(|node| { + if let Some(updated) = node.with_expression_analyzer_registry(registry) { + Ok(Transformed::yes(updated)) + } else { + Ok(Transformed::no(node)) + } + }) + .map(|t| t.data) + } + fn create_project_physical_exec( &self, session_state: &SessionState, @@ -2910,9 +2951,10 @@ impl DefaultPhysicalPlanner { .into_iter() .map(|(expr, alias)| ProjectionExpr { expr, alias }) .collect(); - let new_proj_exec = - ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?; - Ok(Arc::new(new_proj_exec)) + Ok(Arc::new(ProjectionExec::try_new( + proj_exprs, + Arc::new(async_exec), + )?)) } _ => internal_err!("Unexpected PlanAsyncExpressions variant"), } diff --git a/datafusion/core/tests/physical_optimizer/expression_analyzer_injection.rs b/datafusion/core/tests/physical_optimizer/expression_analyzer_injection.rs new file mode 100644 index 0000000000000..449eafd65afdb --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/expression_analyzer_injection.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests that ExpressionAnalyzerRegistry is preserved through optimizer rules. +//! +//! Optimizer rules like CombinePartialFinalAggregate, EnforceDistribution, and +//! ProjectionPushdown rebuild exec nodes internally. The post-rule injection loop +//! in the physical planner must re-inject the ExpressionAnalyzer registry into those new nodes. + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow::array::{Int64Array, RecordBatch}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion::prelude::SessionContext; + use datafusion_common::Result; + use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; + use datafusion_execution::config::SessionConfig; + use datafusion_physical_plan::ExecutionPlan; + + /// Creates a SessionContext with `use_expression_analyzer = true`. + fn make_ctx() -> SessionContext { + let mut config = SessionConfig::new(); + config.options_mut().optimizer.use_expression_analyzer = true; + SessionContext::new_with_config(config) + } + + /// Registers an in-memory table with a single int64 column. + async fn register_table(ctx: &SessionContext, name: &str) -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1i64, 2, 3]))], + )?; + let table = datafusion::datasource::MemTable::try_new(schema, vec![vec![batch]])?; + ctx.register_table(name, Arc::new(table))?; + Ok(()) + } + + /// Asserts that every node in `plan` that declares + /// `uses_expression_level_statistics() == true` has the registry set. + fn assert_expression_analyzer_injected(plan: &Arc) -> Result<()> { + plan.clone().apply(|node| { + if node.uses_expression_level_statistics() { + assert!( + node.expression_analyzer_registry().is_some(), + "{} declares uses_expression_level_statistics but has no registry after optimization", + node.name() + ); + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(()) + } + + /// Filter and aggregate go through CombinePartialFinalAggregate which rebuilds + /// the partial/final AggregateExec pair; both must end up with the ExpressionAnalyzer registry injected. + #[tokio::test] + async fn test_expression_analyzer_injected_through_aggregate_rewrite() -> Result<()> { + let ctx = make_ctx(); + register_table(&ctx, "t").await?; + + let df = ctx + .sql("SELECT a, COUNT(*) FROM t WHERE a > 1 GROUP BY a") + .await?; + let plan = df.create_physical_plan().await?; + + assert_expression_analyzer_injected(&plan) + } + + /// A simple filter query - FilterExec must have the registry even without complex rewrites. + #[tokio::test] + async fn test_expression_analyzer_injected_filter() -> Result<()> { + let ctx = make_ctx(); + register_table(&ctx, "t").await?; + + let df = ctx.sql("SELECT a FROM t WHERE a > 1").await?; + let plan = df.create_physical_plan().await?; + + assert_expression_analyzer_injected(&plan) + } + + /// A join query - HashJoinExec must have the registry after join selection rewrites it. + #[tokio::test] + async fn test_expression_analyzer_injected_join() -> Result<()> { + let ctx = make_ctx(); + register_table(&ctx, "t1").await?; + register_table(&ctx, "t2").await?; + + let df = ctx + .sql("SELECT t1.a FROM t1 JOIN t2 ON t1.a = t2.a WHERE t1.a > 0") + .await?; + let plan = df.create_physical_plan().await?; + + assert_expression_analyzer_injected(&plan) + } + + /// Disabled by default: verify that the flag gates the injection walk. + /// When `use_expression_analyzer = false`, exec nodes must NOT have the registry set. + #[tokio::test] + async fn test_expression_analyzer_not_injected_when_disabled() -> Result<()> { + let ctx = SessionContext::new(); // default config, ExpressionAnalyzer disabled + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1i64, 2, 3]))], + )?; + let table = datafusion::datasource::MemTable::try_new(schema, vec![vec![batch]])?; + ctx.register_table("t", Arc::new(table))?; + + let df = ctx.sql("SELECT a FROM t WHERE a > 1").await?; + let plan = df.create_physical_plan().await?; + + plan.clone().apply(|node| { + if node.uses_expression_level_statistics() { + assert!( + node.expression_analyzer_registry().is_none(), + "{} should NOT have the registry when use_expression_analyzer=false", + node.name() + ); + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(()) + } +} diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index b7ba661d2343a..999ce12eb1879 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -39,4 +39,5 @@ mod test_utils; mod window_optimize; mod window_topn; +mod expression_analyzer_injection; mod pushdown_utils; diff --git a/datafusion/physical-expr/src/expression_analyzer/default.rs b/datafusion/physical-expr/src/expression_analyzer/default.rs new file mode 100644 index 0000000000000..b51701e04e2fa --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/default.rs @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Default expression analyzer with Selinger-style estimation. + +use std::sync::Arc; + +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion_expr::Operator; + +use crate::PhysicalExpr; +use crate::expressions::{BinaryExpr, Column, Literal, NotExpr}; + +use super::{AnalysisResult, ExpressionAnalyzer, ExpressionAnalyzerRegistry}; + +/// Default expression analyzer with Selinger-style estimation. +/// +/// Handles common expression types: +/// - Column references (uses column statistics) +/// - Binary expressions (AND, OR, comparison operators) +/// - Literals (constant selectivity/NDV) +/// - NOT expressions (1 - child selectivity) +#[derive(Debug, Default, Clone)] +pub struct DefaultExpressionAnalyzer; + +impl DefaultExpressionAnalyzer { + /// Get column index from a Column expression + fn get_column_index(expr: &Arc) -> Option { + expr.downcast_ref::().map(|c| c.index()) + } + + /// Get column statistics for an expression if it's a column reference + fn get_column_stats<'a>( + expr: &Arc, + input_stats: &'a Statistics, + ) -> Option<&'a ColumnStatistics> { + Self::get_column_index(expr) + .and_then(|idx| input_stats.column_statistics.get(idx)) + } + + /// Resolve NDV for a binary predicate by taking the max of both sides. + /// + /// Using max is symmetric (order-independent) and handles column-vs-column, + /// column-vs-expression, and expression-vs-expression uniformly through the + /// registry chain. Returns `None` when either side has unknown NDV. + fn resolve_ndv( + left: &Arc, + right: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> Option { + let l = registry.get_distinct_count(left, input_stats); + let r = registry.get_distinct_count(right, input_stats); + match (l, r) { + (Some(a), Some(b)) => Some(a.max(b)).filter(|&n| n > 0), + _ => None, + } + } +} + +impl ExpressionAnalyzer for DefaultExpressionAnalyzer { + fn get_selectivity( + &self, + expr: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Binary expressions: AND, OR, comparisons + if let Some(binary) = expr.downcast_ref::() { + match binary.op() { + // AND/OR: only provide a value when both children have estimates. + // Delegating when either child has no information prevents arbitrary + // constants from contaminating the product/inclusion-exclusion formula. + Operator::And | Operator::Or => { + if let (Some(left), Some(right)) = ( + registry.get_selectivity(binary.left(), input_stats), + registry.get_selectivity(binary.right(), input_stats), + ) { + let sel = match binary.op() { + Operator::And => left * right, + Operator::Or => left + right - left * right, + _ => unreachable!(), + }; + return AnalysisResult::Computed(sel); + } + } + + // Equality: 1/NDV. Delegate when NDV is unknown so default_selectivity applies. + Operator::Eq => { + if let Some(ndv) = Self::resolve_ndv( + binary.left(), + binary.right(), + input_stats, + registry, + ) { + return AnalysisResult::Computed(1.0 / (ndv as f64)); + } + } + + // Inequality: 1 - 1/NDV. Delegate when NDV is unknown. + Operator::NotEq => { + if let Some(ndv) = Self::resolve_ndv( + binary.left(), + binary.right(), + input_stats, + registry, + ) { + return AnalysisResult::Computed(1.0 - (1.0 / (ndv as f64))); + } + } + + // All other operators: no statistics available, let caller decide. + _ => {} + } + + return AnalysisResult::Delegate; + } + + // NOT expression: 1 - child selectivity. Delegate if child has no estimate. + if let Some(not_expr) = expr.downcast_ref::() { + if let Some(child_sel) = registry.get_selectivity(not_expr.arg(), input_stats) + { + return AnalysisResult::Computed(1.0 - child_sel); + } + return AnalysisResult::Delegate; + } + + // Literal boolean: exact selectivity, no statistics needed. + if let Some(b) = + expr.downcast_ref::() + .and_then(|lit| match lit.value() { + ScalarValue::Boolean(Some(b)) => Some(*b), + _ => None, + }) + { + return AnalysisResult::Computed(if b { 1.0 } else { 0.0 }); + } + + AnalysisResult::Delegate + } + + fn get_distinct_count( + &self, + expr: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Column reference: use column NDV + if let Some(ndv) = Self::get_column_stats(expr, input_stats) + .and_then(|col_stats| col_stats.distinct_count.get_value().copied()) + { + return AnalysisResult::Computed(ndv); + } + + // Literal: NDV = 1 + if expr.downcast_ref::().is_some() { + return AnalysisResult::Computed(1); + } + + // BinaryExpr: addition/subtraction with a literal is always injective + // TODO: support more injective operators (e.g. multiply by non-zero) + if let Some(binary) = expr.downcast_ref::() { + let is_injective = matches!(binary.op(), Operator::Plus | Operator::Minus); + + if is_injective { + // If one side is a literal, the operation is injective on the other side + let left_is_literal = binary.left().is::(); + let right_is_literal = binary.right().is::(); + + if left_is_literal + && let Some(ndv) = + registry.get_distinct_count(binary.right(), input_stats) + { + return AnalysisResult::Computed(ndv); + } else if right_is_literal + && let Some(ndv) = + registry.get_distinct_count(binary.left(), input_stats) + { + return AnalysisResult::Computed(ndv); + } + // Both sides are non-literals: could combine, but delegate for now + } + } + + AnalysisResult::Delegate + } + + fn get_min_max( + &self, + expr: &Arc, + input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult<(ScalarValue, ScalarValue)> { + // Column reference: use column min/max + if let Some((min, max)) = + Self::get_column_stats(expr, input_stats).and_then(|col_stats| { + match ( + col_stats.min_value.get_value(), + col_stats.max_value.get_value(), + ) { + (Some(min), Some(max)) => Some((min.clone(), max.clone())), + _ => None, + } + }) + { + return AnalysisResult::Computed((min, max)); + } + + // Literal: min = max = value + if let Some(lit_expr) = expr.downcast_ref::() { + let val = lit_expr.value().clone(); + return AnalysisResult::Computed((val.clone(), val)); + } + + AnalysisResult::Delegate + } + + fn get_null_fraction( + &self, + expr: &Arc, + input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Column reference: null_count / num_rows + if let Some(fraction) = + Self::get_column_stats(expr, input_stats).and_then(|col_stats| { + let null_count = col_stats.null_count.get_value().copied()?; + let num_rows = input_stats.num_rows.get_value().copied()?; + if num_rows > 0 { + Some(null_count as f64 / num_rows as f64) + } else { + None + } + }) + { + return AnalysisResult::Computed(fraction); + } + + // Literal: null fraction depends on whether it's null + if let Some(lit_expr) = expr.downcast_ref::() { + let is_null = lit_expr.value().is_null(); + return AnalysisResult::Computed(if is_null { 1.0 } else { 0.0 }); + } + + AnalysisResult::Delegate + } +} diff --git a/datafusion/physical-expr/src/expression_analyzer/mod.rs b/datafusion/physical-expr/src/expression_analyzer/mod.rs new file mode 100644 index 0000000000000..6cbfa86034297 --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/mod.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable expression-level statistics analysis. +//! +//! This module provides an extensible mechanism for computing expression-level +//! statistics metadata (selectivity, NDV, min/max bounds) following the chain +//! of responsibility pattern. +//! +//! # Overview +//! +//! Different expressions have different statistical properties: +//! +//! - **Injective functions** (UPPER, LOWER, ABS on non-negative): preserve NDV +//! - **Non-injective functions** (FLOOR, YEAR, SUBSTRING): reduce NDV +//! - **Monotonic functions**: allow min/max bound propagation +//! - **Constants**: NDV = 1, selectivity depends on value +//! +//! The default implementation uses classic Selinger-style estimation. Users can +//! register custom [`ExpressionAnalyzer`] implementations to: +//! +//! 1. Provide statistics for custom UDFs +//! 2. Override default estimation with domain-specific knowledge +//! 3. Plug in advanced approaches (e.g., histogram-based estimation) +//! +//! # Example +//! +//! ```ignore +//! use datafusion_physical_expr::expression_analyzer::*; +//! +//! // Create registry with default analyzer +//! let mut registry = ExpressionAnalyzerRegistry::new(); +//! +//! // Register custom analyzer (higher priority) +//! registry.register(Arc::new(MyCustomAnalyzer)); +//! +//! // Query expression statistics +//! let selectivity = registry.get_selectivity(&predicate, &input_stats); +//! ``` + +mod default; + +#[cfg(test)] +mod tests; + +pub use default::DefaultExpressionAnalyzer; + +use std::fmt::Debug; +use std::sync::Arc; + +use datafusion_common::{ScalarValue, Statistics}; + +use crate::PhysicalExpr; + +/// Result of expression analysis - either computed or delegate to next analyzer. +#[derive(Debug, Clone)] +pub enum AnalysisResult { + /// Analysis was performed, here's the result + Computed(T), + /// This analyzer doesn't handle this expression; delegate to next + Delegate, +} + +/// Expression-level metadata analysis. +/// +/// Implementations can handle specific expression types or provide domain +/// knowledge for custom UDFs. The chain of analyzers is traversed until one +/// returns [`AnalysisResult::Computed`]. +/// +/// The `registry` parameter allows analyzers to delegate sub-expression +/// analysis back through the full chain, rather than hard-coding a specific +/// analyzer. For example, a function analyzer can ask the registry for the +/// NDV of its input argument, which will traverse the full chain (including +/// any custom analyzers the user registered). +/// +/// # Implementing a Custom Analyzer +/// +/// ```ignore +/// #[derive(Debug)] +/// struct MyUdfAnalyzer; +/// +/// impl ExpressionAnalyzer for MyUdfAnalyzer { +/// fn get_selectivity( +/// &self, +/// expr: &Arc, +/// input_stats: &Statistics, +/// registry: &ExpressionAnalyzerRegistry, +/// ) -> AnalysisResult { +/// // Recognize my custom is_valid_email() UDF +/// if is_my_email_validator(expr) { +/// return AnalysisResult::Computed(0.8); // ~80% valid +/// } +/// AnalysisResult::Delegate +/// } +/// } +/// ``` +pub trait ExpressionAnalyzer: Debug + Send + Sync { + /// Estimate selectivity when this expression is used as a predicate. + /// + /// Returns a value in [0.0, 1.0] representing the fraction of rows + /// that satisfy the predicate. + fn get_selectivity( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } + + /// Estimate the number of distinct values in the expression's output. + /// + /// Properties: + /// - Injective functions preserve input NDV + /// - Non-injective functions reduce NDV (e.g., FLOOR, YEAR) + /// - Constants have NDV = 1 + fn get_distinct_count( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } + + /// Estimate min/max bounds of the expression's output. + /// + /// Monotonic functions can transform input bounds: + /// - Increasing: (f(min), f(max)) + /// - Decreasing: (f(max), f(min)) + /// - Non-monotonic: may need wider bounds or return Delegate + fn get_min_max( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult<(ScalarValue, ScalarValue)> { + AnalysisResult::Delegate + } + + /// Estimate the fraction of null values in the expression's output. + /// + /// Returns a value in [0.0, 1.0]. + fn get_null_fraction( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } +} + +/// Registry that chains [`ExpressionAnalyzer`] implementations. +/// +/// Analyzers are tried in order; the first to return [`AnalysisResult::Computed`] +/// wins. Register domain-specific analyzers before the default for override. +#[derive(Debug, Clone)] +pub struct ExpressionAnalyzerRegistry { + analyzers: Vec>, +} + +impl Default for ExpressionAnalyzerRegistry { + fn default() -> Self { + Self::new() + } +} + +impl ExpressionAnalyzerRegistry { + /// Create a registry pre-populated with [`DefaultExpressionAnalyzer`]. + pub fn new() -> Self { + Self { + analyzers: vec![Arc::new(DefaultExpressionAnalyzer)], + } + } + + /// Create a registry with only the given analyzers and no built-in fallback. + /// + /// If none of the provided analyzers can handle a request, the registry + /// returns `None` and the caller applies its own default (e.g. + /// `default_selectivity` for filters). [`DefaultExpressionAnalyzer`] is + /// the built-in analyzer that handles common patterns (equality, + /// AND/OR, literals); use [`with_analyzers_and_default`] to include it + /// as a fallback after your custom analyzers. + /// + /// [`with_analyzers_and_default`]: ExpressionAnalyzerRegistry::with_analyzers_and_default + pub fn with_analyzers_only(analyzers: Vec>) -> Self { + Self { analyzers } + } + + /// Create a registry with custom analyzers followed by the + /// [`DefaultExpressionAnalyzer`] as fallback. + pub fn with_analyzers_and_default( + analyzers: impl IntoIterator>, + ) -> Self { + let mut all: Vec> = analyzers.into_iter().collect(); + all.push(Arc::new(DefaultExpressionAnalyzer)); + Self { analyzers: all } + } + + /// Register an analyzer at the front of the chain (higher priority). + pub fn register(&mut self, analyzer: Arc) { + self.analyzers.insert(0, analyzer); + } + + /// Get selectivity through the analyzer chain. + pub fn get_selectivity( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(sel) = + analyzer.get_selectivity(expr, input_stats, self) + { + return Some(sel.clamp(0.0, 1.0)); + } + } + None + } + + /// Get distinct count through the analyzer chain. + pub fn get_distinct_count( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(ndv) = + analyzer.get_distinct_count(expr, input_stats, self) + { + return Some(ndv); + } + } + None + } + + /// Get min/max bounds through the analyzer chain. + pub fn get_min_max( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option<(ScalarValue, ScalarValue)> { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(bounds) = + analyzer.get_min_max(expr, input_stats, self) + { + return Some(bounds); + } + } + None + } + + /// Get null fraction through the analyzer chain. + pub fn get_null_fraction( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(frac) = + analyzer.get_null_fraction(expr, input_stats, self) + { + return Some(frac.clamp(0.0, 1.0)); + } + } + None + } +} diff --git a/datafusion/physical-expr/src/expression_analyzer/tests.rs b/datafusion/physical-expr/src/expression_analyzer/tests.rs new file mode 100644 index 0000000000000..c5f5af6337863 --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/tests.rs @@ -0,0 +1,497 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; +use crate::PhysicalExpr; +use crate::expressions::{BinaryExpr, Column, Literal, NotExpr}; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion_expr::Operator; +use std::sync::Arc; + +fn make_stats_with_ndvs(num_rows: usize, ndvs: &[usize]) -> Statistics { + Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Absent, + column_statistics: ndvs + .iter() + .map(|&ndv| ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Exact(ndv), + byte_size: Precision::Absent, + }) + .collect(), + } +} + +// NDV tests + +#[test] +fn test_column_ndv() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + assert_eq!(registry.get_distinct_count(&col, &stats), Some(100)); +} + +#[test] +fn test_literal_ndv() { + let stats = make_stats_with_ndvs(1000, &[100]); + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + assert_eq!(registry.get_distinct_count(&lit, &stats), Some(1)); +} + +#[test] +fn test_arithmetic_ndv() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + // col + 1: injective, preserves NDV + let plus = Arc::new(BinaryExpr::new( + Arc::clone(&col), + Operator::Plus, + Arc::clone(&lit), + )) as Arc; + assert_eq!(registry.get_distinct_count(&plus, &stats), Some(100)); + + // col - 1: injective, preserves NDV + let minus = Arc::new(BinaryExpr::new( + Arc::clone(&col), + Operator::Minus, + Arc::clone(&lit), + )) as Arc; + assert_eq!(registry.get_distinct_count(&minus, &stats), Some(100)); + + // 1 + col: also injective (literal on left) + let plus_rev = + Arc::new(BinaryExpr::new(lit, Operator::Plus, col)) as Arc; + assert_eq!(registry.get_distinct_count(&plus_rev, &stats), Some(100)); +} + +// Selectivity tests + +#[test] +fn test_equality_selectivity() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.01).abs() < 0.001); // 1/NDV = 1/100 +} + +#[test] +fn test_equality_selectivity_column_on_right() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(lit, Operator::Eq, col)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.01).abs() < 0.001); +} + +#[test] +fn test_equality_selectivity_no_ndv_delegates() { + // When the column has no distinct_count, resolve_ndv must return None + // so the predicate delegates rather than using the literal's NDV (1) as + // the denominator, which would produce selectivity = 1.0. + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::default()], + }; + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + assert_eq!(registry.get_selectivity(&eq, &stats), None); +} + +#[test] +fn test_and_selectivity() { + // Both children have NDV, so AND can be computed. + let stats = make_stats_with_ndvs(1000, &[100, 50]); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 1)) as Arc; + let lit1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let lit2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))) as Arc; + + // a = 42 AND b = 10: 1/100 * 1/50 = 0.0002 + let eq_a = + Arc::new(BinaryExpr::new(col_a, Operator::Eq, lit1)) as Arc; + let eq_b = + Arc::new(BinaryExpr::new(col_b, Operator::Eq, lit2)) as Arc; + let and_expr = + Arc::new(BinaryExpr::new(eq_a, Operator::And, eq_b)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&and_expr, &stats).unwrap(); + assert!((sel - 0.0002).abs() < 1e-6); // 0.01 * 0.02 + + // When a child has no NDV (column stats absent), its selectivity is unknown, + // so AND cannot produce an estimate and must delegate. + let stats_no_ndv = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::default()], + }; + // c = 1: column c has no distinct_count, so resolve_ndv returns None -> Delegate + let eq_no_ndv = Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 0)) as Arc, + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc, + )) as Arc; + // c > 5: no range selectivity without statistics -> Delegate + let gt_no_ndv = Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 0)) as Arc, + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))) as Arc, + )) as Arc; + let and_no_info = Arc::new(BinaryExpr::new(eq_no_ndv, Operator::And, gt_no_ndv)) + as Arc; + assert_eq!(registry.get_selectivity(&and_no_info, &stats_no_ndv), None); +} + +#[test] +fn test_or_selectivity() { + // Both children have NDV, so OR can use inclusion-exclusion. + let stats = make_stats_with_ndvs(1000, &[100, 50]); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 1)) as Arc; + let lit1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let lit2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))) as Arc; + + // a = 42 OR b = 10: 0.01 + 0.02 - 0.0002 = 0.0298 + let eq_a = + Arc::new(BinaryExpr::new(col_a, Operator::Eq, lit1)) as Arc; + let eq_b = + Arc::new(BinaryExpr::new(col_b, Operator::Eq, lit2)) as Arc; + let or_expr = + Arc::new(BinaryExpr::new(eq_a, Operator::Or, eq_b)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&or_expr, &stats).unwrap(); + assert!((sel - 0.0298).abs() < 1e-6); // 0.01 + 0.02 - 0.01*0.02 + + // When a child has no NDV (column stats absent), its selectivity is unknown, + // so OR cannot produce an estimate and must delegate. + let stats_no_ndv = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::default()], + }; + // c = 1: column c has no distinct_count, so resolve_ndv returns None -> Delegate + let eq_no_ndv = Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 0)) as Arc, + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc, + )) as Arc; + // c > 5: no range selectivity without statistics -> Delegate + let gt_no_ndv = Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 0)) as Arc, + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))) as Arc, + )) as Arc; + let or_no_info = Arc::new(BinaryExpr::new(eq_no_ndv, Operator::Or, gt_no_ndv)) + as Arc; + assert_eq!(registry.get_selectivity(&or_no_info, &stats_no_ndv), None); +} + +#[test] +fn test_not_selectivity() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + let not_expr = Arc::new(NotExpr::new(eq)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(¬_expr, &stats).unwrap(); + assert!((sel - 0.99).abs() < 0.001); // 1 - 0.01 +} + +#[test] +fn test_equality_selectivity_expression_eq_literal() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + let one = + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; + let forty_two = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let a_plus_1 = + Arc::new(BinaryExpr::new(col, Operator::Plus, one)) as Arc; + let eq = Arc::new(BinaryExpr::new(a_plus_1, Operator::Eq, forty_two)) + as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + // NDV(a + 1) = NDV(a) = 100, so selectivity = 1/100 = 0.01 + assert!((sel - 0.01).abs() < 0.001); +} + +#[test] +fn test_inequality_selectivity_expression_neq_literal() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + let one = + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; + let forty_two = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let a_plus_1 = + Arc::new(BinaryExpr::new(col, Operator::Plus, one)) as Arc; + let neq = Arc::new(BinaryExpr::new(a_plus_1, Operator::NotEq, forty_two)) + as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&neq, &stats).unwrap(); + // NDV(a + 1) = 100, selectivity = 1 - 1/100 = 0.99 + assert!((sel - 0.99).abs() < 0.001); +} + +// Tests for resolve_ndv symmetry (column-vs-column and column-vs-expression) + +#[test] +fn test_equality_selectivity_column_eq_column_symmetric() { + // a = b and b = a must give the same selectivity regardless of operand order + let stats = make_stats_with_ndvs(1000, &[50, 200]); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 1)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + + let eq_ab = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Eq, + Arc::clone(&col_b), + )) as Arc; + let eq_ba = Arc::new(BinaryExpr::new( + Arc::clone(&col_b), + Operator::Eq, + Arc::clone(&col_a), + )) as Arc; + + let sel_ab = registry.get_selectivity(&eq_ab, &stats).unwrap(); + let sel_ba = registry.get_selectivity(&eq_ba, &stats).unwrap(); + + assert_eq!(sel_ab, sel_ba); +} + +#[test] +fn test_equality_selectivity_column_eq_expression_uses_max_ndv() { + // col = (expr) should use max(ndv(col), ndv(expr)), not just ndv(col) + // Here col has ndv=10 and expr=(b+1) has ndv=200 from column b + let stats = make_stats_with_ndvs(1000, &[10, 200]); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 1)) as Arc; + let one = + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))) as Arc; + let b_plus_1 = + Arc::new(BinaryExpr::new(col_b, Operator::Plus, one)) as Arc; + + let eq = + Arc::new(BinaryExpr::new(col_a, Operator::Eq, b_plus_1)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + // max(ndv(a)=10, ndv(b+1)=200) = 200, so selectivity = 1/200 + assert_eq!(sel, 1.0 / 200.0); +} + +// Min/max tests + +#[test] +fn test_column_min_max() { + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Absent, + null_count: Precision::Exact(0), + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + assert_eq!( + registry.get_min_max(&col, &stats), + Some((ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(100)))) + ); +} + +#[test] +fn test_literal_min_max() { + let stats = make_stats_with_ndvs(100, &[10]); + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + assert_eq!( + registry.get_min_max(&lit, &stats), + Some((ScalarValue::Int32(Some(42)), ScalarValue::Int32(Some(42)))) + ); +} + +// Null fraction tests + +#[test] +fn test_column_null_fraction() { + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(250), + min_value: Precision::Absent, + max_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + let frac = registry.get_null_fraction(&col, &stats).unwrap(); + assert!((frac - 0.25).abs() < 0.001); +} + +#[test] +fn test_literal_null_fraction() { + let stats = make_stats_with_ndvs(100, &[10]); + let registry = ExpressionAnalyzerRegistry::new(); + + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + assert_eq!(registry.get_null_fraction(&lit, &stats), Some(0.0)); + + let null_lit = + Arc::new(Literal::new(ScalarValue::Int32(None))) as Arc; + assert_eq!(registry.get_null_fraction(&null_lit, &stats), Some(1.0)); +} + +// Custom analyzer tests + +#[derive(Debug)] +struct FixedSelectivityAnalyzer(f64); + +impl ExpressionAnalyzer for FixedSelectivityAnalyzer { + fn get_selectivity( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Computed(self.0) + } +} + +#[test] +fn test_custom_analyzer_overrides_default() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + + let mut registry = ExpressionAnalyzerRegistry::new(); + registry.register(Arc::new(FixedSelectivityAnalyzer(0.42))); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.42).abs() < 0.001); +} + +#[derive(Debug)] +struct ColumnAOnlyAnalyzer; + +impl ExpressionAnalyzer for ColumnAOnlyAnalyzer { + fn get_selectivity( + &self, + expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + if let Some(binary) = expr.downcast_ref::() + && let Some(col) = binary.left().downcast_ref::() + && col.name() == "a" + && matches!(binary.op(), Operator::Eq) + { + return AnalysisResult::Computed(0.99); + } + AnalysisResult::Delegate + } +} + +#[test] +fn test_custom_analyzer_delegates_to_default() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + + let eq_a = Arc::new(BinaryExpr::new(col_a, Operator::Eq, Arc::clone(&lit))) + as Arc; + let eq_b = + Arc::new(BinaryExpr::new(col_b, Operator::Eq, lit)) as Arc; + + let mut registry = ExpressionAnalyzerRegistry::new(); + registry.register(Arc::new(ColumnAOnlyAnalyzer)); + + let sel_a = registry.get_selectivity(&eq_a, &stats).unwrap(); + assert!((sel_a - 0.99).abs() < 0.001); + + let sel_b = registry.get_selectivity(&eq_b, &stats).unwrap(); + assert!((sel_b - 0.01).abs() < 0.001); +} + +#[test] +fn test_with_analyzers_and_default() { + let stats = make_stats_with_ndvs(1000, &[100]); + let col = Arc::new(Column::new("a", 0)) as Arc; + + let registry = + ExpressionAnalyzerRegistry::with_analyzers_and_default(vec![Arc::new( + ColumnAOnlyAnalyzer, + ) + as Arc]); + + assert_eq!(registry.get_distinct_count(&col, &stats), Some(100)); +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index bedd348dab92f..0e5f3945bf571 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -33,6 +33,7 @@ pub mod binary_map { } pub mod async_scalar_function; pub mod equivalence; +pub mod expression_analyzer; pub mod expressions; pub mod intervals; mod partitioning; diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 62c4d425706d7..2249d496ab86c 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -21,6 +21,7 @@ use std::ops::Deref; use std::sync::Arc; use crate::PhysicalExpr; +use crate::expression_analyzer::ExpressionAnalyzerRegistry; use crate::expressions::{Column, Literal}; use crate::scalar_function::ScalarFunctionExpr; use crate::utils::collect_columns; @@ -125,12 +126,22 @@ impl From for (Arc, String) { /// /// See [`ProjectionExprs::from_indices`] to select a subset of columns by /// indices. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct ProjectionExprs { /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. exprs: Arc<[ProjectionExpr]>, + /// Optional expression analyzer registry for statistics estimation + expression_analyzer_registry: Option>, +} + +impl PartialEq for ProjectionExprs { + fn eq(&self, other: &Self) -> bool { + self.exprs == other.exprs + } } +impl Eq for ProjectionExprs {} + impl std::fmt::Display for ProjectionExprs { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let exprs: Vec = self.exprs.iter().map(|e| e.to_string()).collect(); @@ -142,6 +153,7 @@ impl From> for ProjectionExprs { fn from(value: Vec) -> Self { Self { exprs: value.into(), + expression_analyzer_registry: None, } } } @@ -150,6 +162,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { exprs: value.iter().cloned().collect(), + expression_analyzer_registry: None, } } } @@ -158,6 +171,7 @@ impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { exprs: exprs.into_iter().collect(), + expression_analyzer_registry: None, } } } @@ -173,6 +187,7 @@ impl ProjectionExprs { pub fn new(exprs: impl IntoIterator) -> Self { Self { exprs: exprs.into_iter().collect(), + expression_analyzer_registry: None, } } @@ -180,9 +195,23 @@ impl ProjectionExprs { pub fn from_expressions(exprs: impl Into>) -> Self { Self { exprs: exprs.into(), + expression_analyzer_registry: None, } } + /// Set the expression analyzer registry for statistics estimation. + /// + /// The physical planner injects the registry at plan creation time and + /// re-injects it after each physical optimizer rule, so projections + /// created by optimizer rules also receive the registry. + pub fn with_expression_analyzer_registry( + mut self, + registry: Arc, + ) -> Self { + self.expression_analyzer_registry = Some(registry); + self + } + /// Creates a [`ProjectionExpr`] from a list of column indices. /// /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column @@ -656,6 +685,15 @@ impl ProjectionExprs { mut stats: Statistics, output_schema: &Schema, ) -> Result { + // Snapshot for analyzer lookups + let original_stats = + self.expression_analyzer_registry + .as_ref() + .map(|_| Statistics { + num_rows: stats.num_rows, + total_byte_size: stats.total_byte_size, + column_statistics: stats.column_statistics.clone(), + }); let mut column_statistics = Vec::with_capacity(self.exprs.len()); for proj_expr in self.exprs.iter() { @@ -713,9 +751,36 @@ impl ProjectionExprs { byte_size, } } + } else if let Some(registry) = &self.expression_analyzer_registry { + // Use ExpressionAnalyzer to estimate statistics for arbitrary expressions + let original_stats = original_stats.as_ref().unwrap(); + let distinct_count = registry + .get_distinct_count(expr, original_stats) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent); + let (min_value, max_value) = registry + .get_min_max(expr, original_stats) + .map(|(min, max)| (Precision::Inexact(min), Precision::Inexact(max))) + .unwrap_or((Precision::Absent, Precision::Absent)); + let null_count = registry + .get_null_fraction(expr, original_stats) + .and_then(|frac| { + original_stats + .num_rows + .get_value() + .map(|&rows| (rows as f64 * frac).ceil() as usize) + }) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent); + + ColumnStatistics { + distinct_count, + min_value, + max_value, + null_count, + ..ColumnStatistics::new_unknown() + } } else { - // TODO stats: estimate more statistics from expressions - // (expressions should compute their statistics themselves) ColumnStatistics::new_unknown() }; column_statistics.push(col_stats); @@ -806,6 +871,19 @@ impl Projector { pub fn projection(&self) -> &ProjectionExprs { &self.projection } + + /// Set the expression analyzer registry on the underlying projection + pub fn set_expression_analyzer_registry( + &mut self, + registry: Arc, + ) { + self.projection.expression_analyzer_registry = Some(registry); + } + + /// Get the expression analyzer registry, if set + pub fn expression_analyzer_registry(&self) -> Option<&ExpressionAnalyzerRegistry> { + self.projection.expression_analyzer_registry.as_deref() + } } /// Describes an immutable reference counted projection. @@ -2772,7 +2850,7 @@ pub(crate) mod tests { // Should have 2 column statistics assert_eq!(output_stats.column_statistics.len(), 2); - // First column (expression) should have unknown statistics + // First column (col0 + 1): no registry set, so statistics are unknown assert_eq!( output_stats.column_statistics[0].distinct_count, Precision::Absent @@ -2791,6 +2869,49 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn test_project_statistics_with_expression_analyzer() -> Result<()> { + let input_stats = get_stats(); + let input_schema = get_schema(); + + // Same projection as test_project_statistics_with_expressions, + // but with the analyzer registry enabled + let projection = ProjectionExprs::new(vec![ + ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("col0", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), + )), + alias: "incremented".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("col1", 1)), + alias: "text".to_string(), + }, + ]) + .with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new())); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + // With analyzer: col0 + 1 is injective, NDV preserved from col0 (= 5) + assert_eq!( + output_stats.column_statistics[0].distinct_count, + Precision::Inexact(5) + ); + + // Second column (col1) still preserves exact statistics + assert_eq!( + output_stats.column_statistics[1].distinct_count, + Precision::Exact(1) + ); + + Ok(()) + } + #[test] fn test_project_statistics_primitive_width_only() -> Result<()> { let input_stats = get_stats(); @@ -3091,4 +3212,110 @@ pub(crate) mod tests { Ok(()) } + + #[test] + fn test_project_statistics_column_then_expression() -> Result<()> { + // SELECT a, a + 1: bare column first, then expression on same column + let input_stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Exact(100), + null_count: Precision::Exact(0), + min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + max_value: Precision::Exact(ScalarValue::Int64(Some(1000))), + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let input_schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "a".to_string(), + }, + ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), + )), + alias: "a_plus_1".to_string(), + }, + ]) + .with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new())); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + // Bare column: exact stats preserved + assert_eq!( + output_stats.column_statistics[0].distinct_count, + Precision::Exact(100) + ); + + // Expression on same column: analyzer should still see a's NDV + assert_eq!( + output_stats.column_statistics[1].distinct_count, + Precision::Inexact(100) + ); + + Ok(()) + } + + #[test] + fn test_project_statistics_expression_then_column() -> Result<()> { + // SELECT a + 1, a: expression first, then bare column + let input_stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Exact(100), + null_count: Precision::Exact(0), + min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + max_value: Precision::Exact(ScalarValue::Int64(Some(1000))), + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let input_schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), + )), + alias: "a_plus_1".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "a".to_string(), + }, + ]) + .with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new())); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + // Expression: analyzer sees a's NDV (no take yet) + assert_eq!( + output_stats.column_statistics[0].distinct_count, + Precision::Inexact(100) + ); + + // Bare column: exact stats preserved + assert_eq!( + output_stats.column_statistics[1].distinct_count, + Precision::Exact(100) + ); + + Ok(()) + } } diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 05df642f8446b..2a232bfa8eb42 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -42,6 +42,7 @@ use crate::pushdown_sort::PushdownSort; use crate::window_topn::WindowTopN; use datafusion_common::Result; use datafusion_common::config::ConfigOptions; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::operator_statistics::StatisticsRegistry; @@ -62,6 +63,11 @@ pub trait PhysicalOptimizerContext: Send + Sync { fn statistics_registry(&self) -> Option<&StatisticsRegistry> { None } + + /// Returns the expression analyzer registry, if `use_expression_analyzer` is enabled. + fn expression_analyzer_registry(&self) -> Option<&Arc> { + None + } } /// Simple context wrapping [`ConfigOptions`] for backward compatibility. diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 76ecb3f1485a4..1e78976ec7a5b 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -35,6 +35,7 @@ use crate::{ SendableRecordBatchStream, Statistics, check_if_same_properties, }; use datafusion_common::config::ConfigOptions; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr::utils::collect_columns; use parking_lot::Mutex; use std::collections::{HashMap, HashSet}; @@ -670,6 +671,9 @@ pub struct AggregateExec { /// it remains `Some(..)` to enable dynamic filtering during aggregate execution; /// otherwise, it is cleared to `None`. dynamic_filter: Option>, + /// Registry for expression-level statistics estimation. + /// Set when `use_expression_analyzer` is enabled. + expression_analyzer_registry: Option>, } impl AggregateExec { @@ -695,6 +699,7 @@ impl AggregateExec { schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), dynamic_filter: self.dynamic_filter.clone(), + expression_analyzer_registry: self.expression_analyzer_registry.clone(), } } @@ -715,6 +720,7 @@ impl AggregateExec { schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), dynamic_filter: self.dynamic_filter.clone(), + expression_analyzer_registry: self.expression_analyzer_registry.clone(), } } @@ -849,6 +855,7 @@ impl AggregateExec { input_order_mode, cache: Arc::new(cache), dynamic_filter: None, + expression_analyzer_registry: None, }; exec.init_dynamic_filter(); @@ -1555,6 +1562,26 @@ impl ExecutionPlan for AggregateExec { Some(self.metrics.clone_inner()) } + fn uses_expression_level_statistics(&self) -> bool { + true + } + + fn with_expression_analyzer_registry( + &self, + registry: &Arc, + ) -> Option> { + if self.expression_analyzer_registry.is_some() { + return None; + } + let mut new_exec = self.clone(); + new_exec.expression_analyzer_registry = Some(Arc::clone(registry)); + Some(Arc::new(new_exec)) + } + + fn expression_analyzer_registry(&self) -> Option<&ExpressionAnalyzerRegistry> { + self.expression_analyzer_registry.as_deref() + } + fn partition_statistics(&self, partition: Option) -> Result> { let child_statistics = self.input().partition_statistics(partition)?; Ok(Arc::new(self.statistics_inner(&child_statistics)?)) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 1a67ea0ded11b..9646ad7df3b6d 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -59,6 +59,7 @@ use datafusion_common::{ use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr_common::sort_expr::{ LexOrdering, OrderingRequirements, PhysicalSortExpr, }; @@ -563,6 +564,35 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } + /// Whether this node could benefit from expression-level statistics + /// (NDV, selectivity, min/max) to compute its output statistics. + /// + /// Nodes that return `true` must also override + /// [`Self::with_expression_analyzer_registry`] to accept the registry and + /// [`Self::expression_analyzer_registry`] to expose it. + /// + /// See `FilterExec`, `ProjectionExec`, `AggregateExec`, and `HashJoinExec` + /// for reference implementations. + fn uses_expression_level_statistics(&self) -> bool { + false + } + + /// Accepts an [`ExpressionAnalyzerRegistry`] and returns a new plan node + /// with the registry stored, or `None` if this node does not use + /// expression-level statistics or a registry is already set. + fn with_expression_analyzer_registry( + &self, + _registry: &Arc, + ) -> Option> { + None + } + + /// Returns the [`ExpressionAnalyzerRegistry`] if one has been injected + /// into this node, or `None` otherwise. + fn expression_analyzer_registry(&self) -> Option<&ExpressionAnalyzerRegistry> { + None + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 50efe8f5092e8..7926132b8b670 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -21,6 +21,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use itertools::Itertools; @@ -95,6 +96,8 @@ pub struct FilterExec { batch_size: usize, /// Number of rows to fetch fetch: Option, + /// Optional expression analyzer registry for selectivity estimation + expression_analyzer_registry: Option>, } /// Builder for [`FilterExec`] to set optional parameters @@ -214,6 +217,7 @@ impl FilterExecBuilder { projection: self.projection, batch_size: self.batch_size, fetch: self.fetch, + expression_analyzer_registry: None, }) } } @@ -287,6 +291,7 @@ impl FilterExec { projection: self.projection.clone(), batch_size, fetch: self.fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), }) } @@ -320,6 +325,7 @@ impl FilterExec { input_stats: Statistics, predicate: &Arc, default_selectivity: u8, + expression_analyzer_registry: Option<&ExpressionAnalyzerRegistry>, ) -> Result { let (eq_columns, is_infeasible) = collect_equality_columns(predicate); @@ -340,9 +346,11 @@ impl FilterExec { } (0.0, Precision::Exact(0), cs) } else if !check_support(predicate, schema) { - // Interval analysis is not applicable; fall back to the default - // selectivity but still pin NDV=1 for every `col = literal` column. - let selectivity = default_selectivity as f64 / 100.0; + // Interval analysis is not applicable. Use ExpressionAnalyzer for + // better selectivity when available, fall back to default_selectivity. + let selectivity = expression_analyzer_registry + .and_then(|r| r.get_selectivity(predicate, &input_stats)) + .unwrap_or(default_selectivity as f64 / 100.0); let mut cs = input_stats.to_inexact().column_statistics; for &idx in &eq_columns { if idx < cs.len() && cs[idx].distinct_count != Precision::Exact(0) { @@ -403,6 +411,7 @@ impl FilterExec { Arc::unwrap_or_clone(input.partition_statistics(None)?), predicate, default_selectivity, + None, )?; let mut eq_properties = input.equivalence_properties().clone(); let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate); @@ -574,6 +583,26 @@ impl ExecutionPlan for FilterExec { Some(self.metrics.clone_inner()) } + fn uses_expression_level_statistics(&self) -> bool { + true + } + + fn with_expression_analyzer_registry( + &self, + registry: &Arc, + ) -> Option> { + if self.expression_analyzer_registry.is_some() { + return None; + } + let mut new_exec = self.clone(); + new_exec.expression_analyzer_registry = Some(Arc::clone(registry)); + Some(Arc::new(new_exec)) + } + + fn expression_analyzer_registry(&self) -> Option<&ExpressionAnalyzerRegistry> { + self.expression_analyzer_registry.as_deref() + } + /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn partition_statistics(&self, partition: Option) -> Result> { @@ -584,6 +613,7 @@ impl ExecutionPlan for FilterExec { input_stats, self.predicate(), self.default_selectivity, + self.expression_analyzer_registry.as_deref(), )?; Ok(Arc::new(stats.project(self.projection.as_ref()))) } @@ -748,6 +778,7 @@ impl ExecutionPlan for FilterExec { projection: self.projection.clone(), batch_size: self.batch_size, fetch: self.fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), }; Some(Arc::new(new) as _) }; @@ -772,6 +803,7 @@ impl ExecutionPlan for FilterExec { projection: self.projection.clone(), batch_size: self.batch_size, fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), })) } @@ -3147,6 +3179,68 @@ mod tests { /// an explicit projection must not panic when `try_swapping_with_projection` /// attempts to swap the two nodes. /// + /// Verifies that `ExpressionAnalyzerRegistry` computes selectivity for OR predicates + /// using inclusion-exclusion, which interval arithmetic cannot represent (a union of + /// two disjoint intervals is not a single interval). + /// + /// For `(a = 42 OR b = 5)` with NDV_a=10, NDV_b=5 on 1000 rows: + /// - Without ExpressionAnalyzer: default 20% selectivity -> 200 rows + /// - With ExpressionAnalyzer: P(a=42) + P(b=5) - P(a=42)*P(b=5) = 0.1 + 0.2 - 0.02 = 0.28 -> 280 rows + #[tokio::test] + async fn test_filter_statistics_expression_analyzer_selectivity_or_predicate() + -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + distinct_count: Precision::Inexact(10), + ..Default::default() + }, + ColumnStatistics { + distinct_count: Precision::Inexact(5), + ..Default::default() + }, + ], + }, + schema.clone(), + )); + // (a = 42 OR b = 5): OR is not expressible as a single interval + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int64(Some(42)))), + )), + Operator::Or, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int64(Some(5)))), + )), + )); + + // Without ExpressionAnalyzer: default 20% selectivity -> 200 rows + let filter = Arc::new(FilterExec::try_new(Arc::clone(&predicate), input as _)?); + let stats = filter.partition_statistics(None)?; + assert_eq!(stats.num_rows, Precision::Inexact(200)); + + // With ExpressionAnalyzer: inclusion-exclusion -> 0.1 + 0.2 - 0.02 = 0.28 -> 280 rows + let registry = Arc::new(ExpressionAnalyzerRegistry::new()); + let filter_with_registry = filter + .with_expression_analyzer_registry(®istry) + .expect("registry should be injectable when not already set"); + let stats_with_registry = filter_with_registry.partition_statistics(None)?; + assert_eq!(stats_with_registry.num_rows, Precision::Inexact(280)); + + Ok(()) + } + /// Before the fix, `FilterExecBuilder::from(self)` copied the old projection /// (e.g. `[0, 1, 2]`) from the FilterExec. After `.with_input` replaced the /// input with the narrower ProjectionExec (2 columns), `.build()` tried to diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 735375441f549..f9df21853677f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -85,6 +85,7 @@ use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumula use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit}; use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; @@ -294,6 +295,7 @@ impl HashJoinExecBuilder { null_equality: NullEquality::NullEqualsNothing, null_aware: false, dynamic_filter: None, + expression_analyzer_registry: None, // Will be computed at when plan will be built. cache: stub_properties(), join_schema: Arc::new(Schema::empty()), @@ -361,6 +363,15 @@ impl HashJoinExecBuilder { self } + /// Set expression analyzer registry. + pub fn with_expression_analyzer_registry( + mut self, + registry: Arc, + ) -> Self { + self.exec.expression_analyzer_registry = Some(registry); + self + } + /// Require to recompute plan properties. pub fn recompute_properties(mut self) -> Self { self.preserve_properties = false; @@ -438,6 +449,7 @@ impl HashJoinExecBuilder { null_aware, dynamic_filter, fetch, + expression_analyzer_registry, // Recomputed. join_schema: _, column_indices: _, @@ -487,6 +499,7 @@ impl HashJoinExecBuilder { cache: Arc::new(cache), dynamic_filter, fetch, + expression_analyzer_registry, }) } @@ -517,6 +530,7 @@ impl From<&HashJoinExec> for HashJoinExecBuilder { cache: Arc::clone(&exec.cache), dynamic_filter: exec.dynamic_filter.clone(), fetch: exec.fetch, + expression_analyzer_registry: exec.expression_analyzer_registry.clone(), }, preserve_properties: true, } @@ -758,6 +772,9 @@ pub struct HashJoinExec { dynamic_filter: Option, /// Maximum number of rows to return fetch: Option, + /// Registry for expression-level statistics estimation. + /// Set when `use_expression_analyzer` is enabled. + expression_analyzer_registry: Option>, } #[derive(Clone)] @@ -1441,6 +1458,30 @@ impl ExecutionPlan for HashJoinExec { Some(self.metrics.clone_inner()) } + fn uses_expression_level_statistics(&self) -> bool { + true + } + + fn with_expression_analyzer_registry( + &self, + registry: &Arc, + ) -> Option> { + if self.expression_analyzer_registry.is_some() { + return None; + } + // build_exec() validates the join configuration; since we only set the + // registry field without changing any structural fields, this cannot fail + // in practice. Map to None (no-op) in the unlikely error case. + self.builder() + .with_expression_analyzer_registry(Arc::clone(registry)) + .build_exec() + .ok() + } + + fn expression_analyzer_registry(&self) -> Option<&ExpressionAnalyzerRegistry> { + self.expression_analyzer_registry.as_deref() + } + fn partition_statistics(&self, partition: Option) -> Result> { let stats = match (partition, self.mode) { // For CollectLeft mode, the left side is collected into a single partition, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 3f309431614a4..13976ceb3bc10 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -53,6 +53,7 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::equivalence::join_equivalence_properties; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; @@ -130,6 +131,9 @@ pub struct SortMergeJoinExec { pub null_equality: NullEquality, /// Cache holding plan properties like equivalences, output partitioning etc. cache: Arc, + /// Registry for expression-level statistics estimation. + /// Set when `use_expression_analyzer` is enabled. + expression_analyzer_registry: Option>, } impl SortMergeJoinExec { @@ -201,6 +205,7 @@ impl SortMergeJoinExec { sort_options, null_equality, cache: Arc::new(cache), + expression_analyzer_registry: None, }) } @@ -581,6 +586,26 @@ impl ExecutionPlan for SortMergeJoinExec { Some(self.metrics.clone_inner()) } + fn uses_expression_level_statistics(&self) -> bool { + true + } + + fn with_expression_analyzer_registry( + &self, + registry: &Arc, + ) -> Option> { + if self.expression_analyzer_registry.is_some() { + return None; + } + let mut new_exec = self.clone(); + new_exec.expression_analyzer_registry = Some(Arc::clone(registry)); + Some(Arc::new(new_exec)) + } + + fn expression_analyzer_registry(&self) -> Option<&ExpressionAnalyzerRegistry> { + self.expression_analyzer_registry.as_deref() + } + fn partition_statistics(&self, partition: Option) -> Result> { // SortMergeJoinExec uses symmetric hash partitioning where both left and right // inputs are hash-partitioned on the join keys. This means partition `i` of the diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index 20266e9768ebe..9329e2b462bd2 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -93,6 +93,9 @@ use std::sync::Arc; use datafusion_common::stats::Precision; use datafusion_common::{Result, Statistics}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; +use datafusion_physical_expr::expressions::Column; use crate::ExecutionPlan; @@ -318,6 +321,10 @@ impl StatisticsRegistry { /// Create a registry pre-loaded with the standard built-in providers. /// + /// Each provider reads the `ExpressionAnalyzerRegistry` from the exec node + /// it processes (set at planning time via `use_expression_analyzer`), so + /// custom analyzers are available to all built-in providers uniformly. + /// /// Provider order (first match wins): /// 1. [`FilterStatisticsProvider`] /// 2. [`ProjectionStatisticsProvider`] @@ -549,7 +556,7 @@ impl StatisticsProvider for FilterStatisticsProvider { input_stats, filter.predicate(), filter.default_selectivity(), - // TODO: pass filter.expression_analyzer_registry() once #21122 lands + filter.expression_analyzer_registry(), )?; // Adjust distinct_count for each column using the selectivity ratio @@ -600,8 +607,6 @@ impl StatisticsProvider for ProjectionStatisticsProvider { let input_stats = (*child_stats[0].base).clone(); let output_schema = proj.schema(); - // TODO: pass proj.expression_analyzer_registry() once #21122 lands, - // so expression-level NDV/min/max feeds into projected column stats. let stats = proj .projection_expr() .project_statistics(input_stats, &output_schema)?; @@ -664,7 +669,7 @@ impl StatisticsProvider for PassthroughStatisticsProvider { /// - GROUP BY is empty (scalar aggregate) /// - Any GROUP BY expression is not a simple column reference /// - Any GROUP BY column lacks NDV information -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct AggregateStatisticsProvider; impl StatisticsProvider for AggregateStatisticsProvider { @@ -674,7 +679,6 @@ impl StatisticsProvider for AggregateStatisticsProvider { child_stats: &[ExtendedStatistics], ) -> Result { use crate::aggregates::AggregateExec; - use datafusion_physical_expr::expressions::Column; use crate::aggregates::AggregateMode; @@ -694,17 +698,13 @@ impl StatisticsProvider for AggregateStatisticsProvider { let input_stats = &child_stats[0].base; - // Compute NDV product of GROUP BY columns + let Some(analyzer) = agg.expression_analyzer_registry() else { + return Ok(StatisticsResult::Delegate); + }; + let mut ndv_product: Option = None; for (expr, _) in agg.group_expr().expr().iter() { - let Some(col) = expr.downcast_ref::() else { - return Ok(StatisticsResult::Delegate); - }; - let Some(&ndv) = input_stats - .column_statistics - .get(col.index()) - .and_then(|s| s.distinct_count.get_value()) - else { + let Some(ndv) = analyzer.get_distinct_count(expr, input_stats) else { return Ok(StatisticsResult::Delegate); }; if ndv == 0 { @@ -756,7 +756,7 @@ impl StatisticsProvider for AggregateStatisticsProvider { /// Delegates when: /// - The plan is not a supported join type /// - Either input lacks row count information -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct JoinStatisticsProvider; impl StatisticsProvider for JoinStatisticsProvider { @@ -765,9 +765,8 @@ impl StatisticsProvider for JoinStatisticsProvider { plan: &dyn ExecutionPlan, child_stats: &[ExtendedStatistics], ) -> Result { - use crate::joins::{CrossJoinExec, HashJoinExec, SortMergeJoinExec}; + use crate::joins::{CrossJoinExec, HashJoinExec, JoinOnRef, SortMergeJoinExec}; use datafusion_common::JoinType; - use datafusion_physical_expr::expressions::Column; if child_stats.len() < 2 { return Ok(StatisticsResult::Delegate); @@ -782,9 +781,25 @@ impl StatisticsProvider for JoinStatisticsProvider { return Ok(StatisticsResult::Delegate); }; - use crate::joins::JoinOnRef; + let analyzer = plan.expression_analyzer_registry(); + + /// Resolve NDV for a join key expression. Uses the ExpressionAnalyzer + /// when available (handles arbitrary expressions), otherwise falls back + /// to direct column statistics lookup (bare column keys only). + fn resolve_key_ndv( + key: &Arc, + stats: &Statistics, + analyzer: Option<&ExpressionAnalyzerRegistry>, + ) -> Option { + if let Some(a) = analyzer { + return a.get_distinct_count(key, stats); + } + key.downcast_ref::() + .and_then(|c| stats.column_statistics.get(c.index())) + .and_then(|s| s.distinct_count.get_value().copied()) + } - /// Estimate equi-join output using NDV of join key columns: + /// Estimate equi-join output using NDV of join key expressions: /// left_rows * right_rows / product(max(left_ndv_i, right_ndv_i)) /// Falls back to Cartesian product if any key lacks NDV on both sides. fn equi_join_estimate( @@ -793,20 +808,15 @@ impl StatisticsProvider for JoinStatisticsProvider { right: &Statistics, left_rows: usize, right_rows: usize, + analyzer: Option<&ExpressionAnalyzerRegistry>, ) -> usize { if on.is_empty() { return left_rows.saturating_mul(right_rows); } let mut ndv_divisor: usize = 1; for (left_key, right_key) in on { - let left_ndv = left_key - .downcast_ref::() - .and_then(|c| left.column_statistics.get(c.index())) - .and_then(|s| s.distinct_count.get_value().copied()); - let right_ndv = right_key - .downcast_ref::() - .and_then(|c| right.column_statistics.get(c.index())) - .and_then(|s| s.distinct_count.get_value().copied()); + let left_ndv = resolve_key_ndv(left_key, left, analyzer); + let right_ndv = resolve_key_ndv(right_key, right, analyzer); match (left_ndv, right_ndv) { (Some(l), Some(r)) if l > 0 && r > 0 => { ndv_divisor = ndv_divisor.saturating_mul(l.max(r)); @@ -821,26 +831,38 @@ impl StatisticsProvider for JoinStatisticsProvider { } } - let (inner_estimate, is_exact_cartesian, join_type) = if let Some(hash_join) = - plan.downcast_ref::() - { - let est = - equi_join_estimate(hash_join.on(), left, right, left_rows, right_rows); - (est, false, *hash_join.join_type()) - } else if let Some(smj) = plan.downcast_ref::() { - let est = equi_join_estimate(smj.on(), left, right, left_rows, right_rows); - (est, false, smj.join_type()) - } else if plan.downcast_ref::().is_some() { - let both_exact = left.num_rows.is_exact().unwrap_or(false) - && right.num_rows.is_exact().unwrap_or(false); - ( - left_rows.saturating_mul(right_rows), - both_exact, - JoinType::Inner, - ) - } else { - return Ok(StatisticsResult::Delegate); - }; + let (inner_estimate, is_exact_cartesian, join_type) = + if let Some(hash_join) = plan.downcast_ref::() { + let est = equi_join_estimate( + hash_join.on(), + left, + right, + left_rows, + right_rows, + analyzer, + ); + (est, false, *hash_join.join_type()) + } else if let Some(smj) = plan.downcast_ref::() { + let est = equi_join_estimate( + smj.on(), + left, + right, + left_rows, + right_rows, + analyzer, + ); + (est, false, smj.join_type()) + } else if plan.downcast_ref::().is_some() { + let both_exact = left.num_rows.is_exact().unwrap_or(false) + && right.num_rows.is_exact().unwrap_or(false); + ( + left_rows.saturating_mul(right_rows), + both_exact, + JoinType::Inner, + ) + } else { + return Ok(StatisticsResult::Delegate); + }; // Apply join-type-aware cardinality bounds let estimated = match join_type { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e5b91fbb1c5d4..3813d11e52d53 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -49,6 +49,7 @@ use datafusion_common::{DataFusionError, JoinSide, Result, internal_err}; use datafusion_execution::TaskContext; use datafusion_expr::ExpressionPlacement; use datafusion_physical_expr::equivalence::ProjectionMapping; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr::projection::Projector; use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql}; use datafusion_physical_expr_common::sort_expr::{ @@ -359,6 +360,28 @@ impl ExecutionPlan for ProjectionExec { Some(self.metrics.clone_inner()) } + fn uses_expression_level_statistics(&self) -> bool { + true + } + + fn with_expression_analyzer_registry( + &self, + registry: &Arc, + ) -> Option> { + if self.expression_analyzer_registry().is_some() { + return None; + } + let mut new_exec = self.clone(); + new_exec + .projector + .set_expression_analyzer_registry(Arc::clone(registry)); + Some(Arc::new(new_exec)) + } + + fn expression_analyzer_registry(&self) -> Option<&ExpressionAnalyzerRegistry> { + self.projector.expression_analyzer_registry() + } + fn partition_statistics(&self, partition: Option) -> Result> { let input_stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?); diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 33756e191909f..fa513ce72975e 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -177,6 +177,10 @@ impl TestContext { info!("Registering dummy async udf"); register_async_abs_udf(test_ctx.session_ctx()) } + "expression_analyzer.slt" => { + info!("Registering tables with controlled statistics"); + statistics_table::register_statistics_tables(test_ctx.session_ctx()); + } _ => { info!("Using default SessionContext"); } @@ -615,3 +619,165 @@ fn register_async_abs_udf(ctx: &SessionContext) { let udf = AsyncScalarUDF::new(Arc::new(async_abs)); ctx.register_udf(udf.into_scalar_udf()); } + +/// A table provider with fully controlled statistics for testing +/// statistics-dependent optimizer and planner behaviors. +/// +/// Unlike [`MemTable`] (which derives statistics from actual data), this +/// provider returns whatever `Statistics` you supply, letting tests exercise +/// code paths that depend on specific column NDV, min/max, or row-count values +/// without needing real data files. +pub mod statistics_table { + use std::sync::Arc; + + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use async_trait::async_trait; + use datafusion::catalog::Session; + use datafusion::common::tree_node::TreeNodeRecursion; + use datafusion::common::{Result, stats::Precision}; + use datafusion::datasource::{TableProvider, TableType}; + use datafusion::execution::TaskContext; + use datafusion::logical_expr::Expr; + use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr}; + use datafusion::physical_plan::{ + ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, Statistics, + execution_plan::{Boundedness, EmissionType}, + }; + use datafusion::prelude::SessionContext; + + /// A [`TableProvider`] and [`ExecutionPlan`] that returns user-supplied + /// statistics. Useful for testing code paths that depend on specific column + /// NDV, min/max, or row counts without requiring real data files. + #[derive(Debug, Clone)] + pub struct StatisticsTable { + schema: SchemaRef, + stats: Statistics, + cache: Arc, + } + + impl StatisticsTable { + pub fn new(schema: SchemaRef, stats: Statistics) -> Self { + assert_eq!( + schema.fields().len(), + stats.column_statistics.len(), + "column_statistics length must match schema field count" + ); + let cache = Arc::new(PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&schema)), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + )); + Self { + schema, + stats, + cache, + } + } + } + + impl DisplayAs for StatisticsTable { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "StatisticsTable") + } + } + + impl ExecutionPlan for StatisticsTable { + fn name(&self) -> &'static str { + "StatisticsTable" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, + ) -> Result { + Ok(TreeNodeRecursion::Continue) + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + datafusion::common::not_impl_err!( + "StatisticsTable is for statistics testing only" + ) + } + + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { + Ok(Arc::new(self.stats.clone())) + } + } + + #[async_trait] + impl TableProvider for StatisticsTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let schema = datafusion::common::project_schema(&self.schema, projection)?; + let stats = self.stats.clone().project(projection); + Ok(Arc::new(StatisticsTable::new(schema, stats))) + } + } + + /// Registers named [`StatisticsTable`] instances needed by SLT tests + /// that require controlled statistics (NDV, row count, min/max). + pub fn register_statistics_tables(ctx: &SessionContext) { + // t_ndv: 1000 rows, column a (Int64, NDV=10), column b (Int64, NDV=5). + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + let stats = Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + distinct_count: Precision::Inexact(10), + ..Default::default() + }, + ColumnStatistics { + distinct_count: Precision::Inexact(5), + ..Default::default() + }, + ], + }; + ctx.register_table("t_ndv", Arc::new(StatisticsTable::new(schema, stats))) + .expect("registering t_ndv should succeed"); + } +} diff --git a/datafusion/sqllogictest/test_files/expression_analyzer.slt b/datafusion/sqllogictest/test_files/expression_analyzer.slt new file mode 100644 index 0000000000000..59bd02e54a2eb --- /dev/null +++ b/datafusion/sqllogictest/test_files/expression_analyzer.slt @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for ExpressionAnalyzerRegistry end-to-end integration. +# +# t_ndv is a StatisticsTable with controlled statistics: +# 1000 rows, column a (Int64, NDV=10), column b (Int64, NDV=5). +# +# OR predicates are not expressible as a single interval, so the built-in +# interval-arithmetic path always falls back to the default selectivity (20%). +# ExpressionAnalyzerRegistry applies inclusion-exclusion instead: +# P(a=42 OR b=5) = P(a=42) + P(b=5) - P(a=42)*P(b=5) +# = 1/10 + 1/5 - 1/50 +# = 0.1 + 0.2 - 0.02 = 0.28 +# Expected rows = round(1000 * 0.28) = 280. + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +SET datafusion.explain.show_statistics = true; + +statement ok +SET datafusion.explain.physical_plan_only = true; + +# Without ExpressionAnalyzerRegistry: OR predicate falls back to 20% default selectivity +# → FilterExec estimated rows = 1000 * 0.20 = 200 +query TT +EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5; +---- +physical_plan +01)FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(200), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +02)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +03)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] + +# Enable ExpressionAnalyzerRegistry so inclusion-exclusion applies to OR predicates +statement ok +SET datafusion.optimizer.use_expression_analyzer = true; + +# With ExpressionAnalyzerRegistry: OR uses inclusion-exclusion +# P(a=42 OR b=5) = 1/10 + 1/5 - (1/10 * 1/5) = 0.28 → 280 rows +query TT +EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5; +---- +physical_plan +01)FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +02)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +03)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] + +# Verify the registry survives physical optimizer rules: ORDER BY + LIMIT triggers the +# TopK sort rule which rewrites the plan above FilterExec. The FilterExec row estimate +# must still reflect inclusion-exclusion (280), not the 20% default. +query TT +EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5 ORDER BY a LIMIT 100; +---- +physical_plan +01)SortExec: TopK(fetch=100), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false], statistics=[Rows=Inexact(100), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +02)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] + +# Verify the registry reaches FilterExec nodes created by optimizer rules: the +# filter_pushdown rule running on UnionExec creates fresh FilterExec nodes (one per +# branch) that never existed when the registry was first injected. Both must show +# 280 rows, confirming re-injection after each rule reaches newly created nodes. +# The UnionExec row count (560 = 2 * 280) and doubled NDVs (20, 10) also confirm +# that distinct-count propagation through UnionExec is correct. +query TT +EXPLAIN SELECT * FROM (SELECT * FROM t_ndv UNION ALL SELECT * FROM t_ndv) WHERE a = 42 OR b = 5; +---- +physical_plan +01)UnionExec, statistics=[Rows=Inexact(560), Bytes=Absent, [(Col[0]: Distinct=Inexact(20)),(Col[1]: Distinct=Inexact(10))]] +02)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +05)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +06)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +07)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] + +# Verify the registry reaches a FilterExec pushed through a join: filter_pushdown +# moves the WHERE clause filter to the left side of the HashJoinExec, creating a +# FilterExec that was not present in the plan at initial injection time. +query TT +EXPLAIN SELECT l.a, r.b FROM t_ndv l JOIN t_ndv r ON l.a = r.a WHERE l.a = 42 OR l.b = 5; +---- +physical_plan +01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, b@2], statistics=[Rows=Inexact(28000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +02)--FilterExec: a@0 = 42 OR b@1 = 5, projection=[a@0], statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10))]] +03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +05)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] +06)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]] + +statement ok +SET datafusion.optimizer.use_expression_analyzer = false; + +statement ok +SET datafusion.explain.show_statistics = false; + +statement ok +SET datafusion.explain.physical_plan_only = false; + +statement ok +SET datafusion.execution.target_partitions = 4; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b04c78bd2774c..5ced3d823b22d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -331,6 +331,7 @@ datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules false datafusion.optimizer.subset_repartition_threshold 4 datafusion.optimizer.top_down_join_key_reordering true +datafusion.optimizer.use_expression_analyzer false datafusion.optimizer.use_statistics_registry false datafusion.runtime.list_files_cache_limit 1M datafusion.runtime.list_files_cache_ttl NULL @@ -478,7 +479,8 @@ datafusion.optimizer.repartition_windows true Should DataFusion repartition data datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail datafusion.optimizer.subset_repartition_threshold 4 Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): ```text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8) ``` datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys -datafusion.optimizer.use_statistics_registry false When set to true, the physical plan optimizer uses the pluggable `StatisticsRegistry` for statistics propagation across operators. This enables more accurate cardinality estimates compared to each operator's built-in `partition_statistics`. +datafusion.optimizer.use_expression_analyzer false When set to true, the pluggable `ExpressionAnalyzerRegistry` from `SessionState` is used for expression-level statistics estimation (NDV, selectivity, min/max, null fraction) in physical plan operators. +datafusion.optimizer.use_statistics_registry false When set to true, the physical plan optimizer uses the pluggable `StatisticsRegistry` for a bottom-up statistics walk across operators, enabling more accurate cardinality estimates. Enabling `use_expression_analyzer` alongside this flag gives built-in providers access to custom expression-level analyzers (NDV, selectivity) for the operators they process. datafusion.runtime.list_files_cache_limit 1M Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. datafusion.runtime.list_files_cache_ttl NULL TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes. datafusion.runtime.max_temp_directory_size 100G Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 49c9eea29ef73..7726dc77c71f4 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -146,6 +146,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | | datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | +| datafusion.optimizer.use_expression_analyzer | false | When set to true, the pluggable `ExpressionAnalyzerRegistry` from `SessionState` is used for expression-level statistics estimation (NDV, selectivity, min/max, null fraction) in physical plan operators. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | @@ -161,7 +162,7 @@ The following configuration settings are available: | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | | datafusion.optimizer.join_reordering | true | When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used. | -| datafusion.optimizer.use_statistics_registry | false | When set to true, the physical plan optimizer uses the pluggable `StatisticsRegistry` for statistics propagation across operators. This enables more accurate cardinality estimates compared to each operator's built-in `partition_statistics`. | +| datafusion.optimizer.use_statistics_registry | false | When set to true, the physical plan optimizer uses the pluggable `StatisticsRegistry` for a bottom-up statistics walk across operators, enabling more accurate cardinality estimates. Enabling `use_expression_analyzer` alongside this flag gives built-in providers access to custom expression-level analyzers (NDV, selectivity) for the operators they process. | | datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | | datafusion.optimizer.enable_piecewise_merge_join | false | When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. | | datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |