diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 81efea1df22b1..2301d8a8fb621 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -208,6 +208,24 @@ impl AggregateUDFImpl for Sum { &self.signature } + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + // Unwrap dictionary types to their value type first. + let arg = match &arg_types[0] { + DataType::Dictionary(_, v) => v.as_ref(), + other => other, + }; + match arg { + DataType::Int8 | DataType::Int16 | DataType::Int32 => { + Ok(vec![DataType::Int64]) + } + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => { + Ok(vec![DataType::UInt64]) + } + DataType::Float32 => Ok(vec![DataType::Float64]), + _ => Ok(arg_types.to_vec()), + } + } + fn return_type(&self, arg_types: &[DataType]) -> Result { match &arg_types[0] { DataType::Int64 => Ok(DataType::Int64), diff --git a/datafusion/optimizer/src/decompose_aggregate.rs b/datafusion/optimizer/src/decompose_aggregate.rs new file mode 100644 index 0000000000000..4347431db14e8 --- /dev/null +++ b/datafusion/optimizer/src/decompose_aggregate.rs @@ -0,0 +1,536 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`DecomposeAggregate`] rewrites `AVG(x)` into `SUM(x) / COUNT(*)` +//! to reduce accumulator overhead and enable sharing the `COUNT(*)` +//! with other aggregates via common subexpression elimination. + +use std::sync::Arc; + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; + +use arrow::datatypes::DataType; +use datafusion_common::tree_node::Transformed; +use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; +use datafusion_common::{Column, DataFusionError, Result}; +use datafusion_expr::builder::project; +use datafusion_expr::expr::AggregateFunctionParams; +use datafusion_expr::expr_fn::cast; +use datafusion_expr::{ + Expr, ExprSchemable, col, + expr::AggregateFunction, + logical_plan::{Aggregate, LogicalPlan}, +}; + +/// Rewrites `AVG(x)` aggregate functions into `SUM(x) / COUNT(*)`. +/// +/// ```text +/// Before: +/// Aggregate: groupBy=[[g]], aggr=[[SUM(a), AVG(b), COUNT(*)]] +/// +/// After: +/// Projection: g, SUM(a), SUM(b) / CAST(COUNT(*) AS Float64) AS AVG(b), COUNT(*) +/// Aggregate: groupBy=[[g]], aggr=[[SUM(a), SUM(b), COUNT(*)]] +/// ``` +/// +/// This reduces accumulator overhead (AVG stores sum + count per group +/// internally) and uses `COUNT(*)` which can be shared with an existing +/// `COUNT(*)` in the query via `CommonSubexprEliminate`. +/// +/// Only applies to `AVG` with Float64 return type (the common case for +/// integer/float columns after type coercion). Skips DISTINCT, filtered, +/// and ordered AVGs, as well as decimal/duration/interval types. +#[derive(Default, Debug)] +pub struct DecomposeAggregate {} + +impl DecomposeAggregate { + #[expect(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +/// Returns true if this is a simple AVG returning Float64. +fn is_eligible_avg( + expr: &Expr, + schema: &datafusion_common::DFSchema, + field_idx: usize, +) -> bool { + if let Expr::AggregateFunction(AggregateFunction { + func, + params: + AggregateFunctionParams { + distinct, + filter, + order_by, + .. + }, + }) = expr + { + func.name() == "avg" + && !distinct + && filter.is_none() + && order_by.is_empty() + && *schema.field(field_idx).data_type() == DataType::Float64 + } else { + false + } +} + +impl OptimizerRule for DecomposeAggregate { + fn name(&self) -> &str { + "decompose_aggregate" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + let LogicalPlan::Aggregate(Aggregate { + input, + aggr_expr, + schema, + group_expr, + .. + }) = plan + else { + return Ok(Transformed::no(plan)); + }; + + // Skip GroupingSet aggregates (ROLLUP/CUBE/GROUPING SETS) — the + // group_expr expands to more schema fields than group_expr.len(), + // which breaks our index arithmetic. + if group_expr + .first() + .is_some_and(|e| matches!(e, Expr::GroupingSet(_))) + { + return Ok(Transformed::no(LogicalPlan::Aggregate( + Aggregate::try_new_with_schema(input, group_expr, aggr_expr, schema)?, + ))); + } + + let group_size = group_expr.len(); + + // Quick check: any eligible AVGs? + let has_avg = aggr_expr + .iter() + .enumerate() + .any(|(i, e)| is_eligible_avg(e, &schema, group_size + i)); + if !has_avg { + return Ok(Transformed::no(LogicalPlan::Aggregate( + Aggregate::try_new_with_schema(input, group_expr, aggr_expr, schema)?, + ))); + } + + // We need the function registry to look up sum / count UDAFs. + let Some(registry) = config.function_registry() else { + return Ok(Transformed::no(LogicalPlan::Aggregate( + Aggregate::try_new_with_schema(input, group_expr, aggr_expr, schema)?, + ))); + }; + + let sum_udaf = registry.udaf("sum")?; + let count_udaf = registry.udaf("count")?; + + let mut new_aggr_exprs = Vec::new(); + let mut alias_idx = 0usize; + + enum AggrMapping { + AvgRewrite { + sum_alias: String, + count_alias: String, + }, + PassThrough(String), + } + let mut mappings: Vec = Vec::new(); + + // COUNT(*) expression — CSE will deduplicate if one already exists. + let count_star = Expr::AggregateFunction(AggregateFunction::new_udf( + Arc::clone(&count_udaf), + vec![Expr::Literal(COUNT_STAR_EXPANSION, None)], + false, + None, + vec![], + None, + )); + + let input_schema = input.schema(); + + for (idx, expr) in aggr_expr.into_iter().enumerate() { + let field_idx = group_size + idx; + + if is_eligible_avg(&expr, &schema, field_idx) { + // Extract args from the AVG expression. + let args = match expr { + Expr::AggregateFunction(AggregateFunction { + params: AggregateFunctionParams { args, .. }, + .. + }) => args, + _ => unreachable!(), + }; + + let sum_alias = format!("__decompose_{alias_idx}"); + alias_idx += 1; + let count_alias = format!("__decompose_{alias_idx}"); + alias_idx += 1; + + // Strip CAST(x AS Float64) when the inner type is one SUM + // handles natively — avoids precision loss for Int64/UInt64. + let sum_args: Vec = args + .iter() + .map(|a| match a { + Expr::Cast(c) if *c.field.data_type() == DataType::Float64 => { + match c.expr.get_type(input_schema.as_ref()).ok().as_ref() { + Some( + DataType::Int64 + | DataType::UInt64 + | DataType::Float64, + ) => (*c.expr).clone(), + _ => a.clone(), + } + } + other => other.clone(), + }) + .collect(); + + let sum_expr = Expr::AggregateFunction(AggregateFunction::new_udf( + Arc::clone(&sum_udaf), + sum_args, + false, + None, + vec![], + None, + )) + .alias(&sum_alias); + + // Use COUNT(*) when the arg is non-nullable (can share with + // existing COUNT(*) via CSE). Fall back to COUNT(x) for + // nullable args since AVG ignores NULLs. + let arg_nullable = + args[0].nullable(input_schema.as_ref()).unwrap_or(true); + let count_expr = if arg_nullable { + Expr::AggregateFunction(AggregateFunction::new_udf( + Arc::clone(&count_udaf), + args, + false, + None, + vec![], + None, + )) + } else { + count_star.clone() + } + .alias(&count_alias); + + new_aggr_exprs.push(sum_expr); + new_aggr_exprs.push(count_expr); + mappings.push(AggrMapping::AvgRewrite { + sum_alias, + count_alias, + }); + } else { + let pt_alias = format!("__decompose_{alias_idx}"); + alias_idx += 1; + new_aggr_exprs.push(expr.alias(&pt_alias)); + mappings.push(AggrMapping::PassThrough(pt_alias)); + } + } + + // Inner Aggregate with rewritten expressions + let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new( + input, + group_expr.clone(), + new_aggr_exprs, + )?); + + // Projection that restores the original schema + let mut proj_exprs = Vec::new(); + + // Group-by columns + for i in 0..group_size { + let (qualifier, field) = schema.qualified_field(i); + let inner_schema = inner_agg.schema(); + let (inner_qual, inner_field) = inner_schema.qualified_field(i); + let col_ref = + Expr::Column(Column::new(inner_qual.cloned(), inner_field.name())); + if qualifier != inner_qual || field.name() != inner_field.name() { + proj_exprs + .push(col_ref.alias_qualified(qualifier.cloned(), field.name())); + } else { + proj_exprs.push(col_ref); + } + } + + // Aggregate results + for (mapping_idx, mapping) in mappings.into_iter().enumerate() { + let orig_idx = group_size + mapping_idx; + let (qualifier, field) = schema.qualified_field(orig_idx); + + match mapping { + AggrMapping::AvgRewrite { + sum_alias, + count_alias, + } => { + let avg_expr = cast(col(&sum_alias), DataType::Float64) + / cast(col(&count_alias), DataType::Float64); + proj_exprs + .push(avg_expr.alias_qualified(qualifier.cloned(), field.name())); + } + AggrMapping::PassThrough(alias) => { + proj_exprs.push( + col(alias).alias_qualified(qualifier.cloned(), field.name()), + ); + } + } + } + + Ok(Transformed::yes(project(inner_agg, proj_exprs)?)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::OptimizerContext; + use crate::test::*; + use datafusion_common::config::ConfigOptions; + use datafusion_expr::logical_plan::builder::LogicalPlanBuilder; + use datafusion_expr::registry::{FunctionRegistry, MemoryFunctionRegistry}; + use datafusion_expr::{col, lit}; + use datafusion_functions_aggregate::average::{avg, avg_distinct, avg_udaf}; + use datafusion_functions_aggregate::count::count_udaf; + use datafusion_functions_aggregate::expr_fn::{count, sum}; + use datafusion_functions_aggregate::sum::sum_udaf; + + use chrono::Utc; + use datafusion_common::alias::AliasGenerator; + + /// An OptimizerConfig that includes a function registry with sum/count. + struct TestConfig { + inner: OptimizerContext, + registry: MemoryFunctionRegistry, + } + + impl TestConfig { + fn new() -> Self { + let mut registry = MemoryFunctionRegistry::new(); + registry.register_udaf(sum_udaf()).unwrap(); + registry.register_udaf(count_udaf()).unwrap(); + Self { + inner: OptimizerContext::new(), + registry, + } + } + } + + impl OptimizerConfig for TestConfig { + fn query_execution_start_time(&self) -> Option> { + self.inner.query_execution_start_time() + } + fn alias_generator(&self) -> &Arc { + self.inner.alias_generator() + } + fn options(&self) -> Arc { + self.inner.options() + } + fn function_registry(&self) -> Option<&dyn FunctionRegistry> { + Some(&self.registry) + } + } + + macro_rules! assert_optimized_plan_equal { + ( + $plan:expr, + @ $expected:literal $(,)? + ) => {{ + let optimizer = $crate::Optimizer::with_rules(vec![ + Arc::new(DecomposeAggregate::new()), + ]); + let config = TestConfig::new(); + let optimized_plan = optimizer + .optimize($plan, &config, |_, _| {}) + .expect("failed to optimize plan"); + let formatted_plan = optimized_plan.display_indent_schema(); + insta::assert_snapshot!(formatted_plan, @ $expected); + Ok::<(), datafusion_common::DataFusionError>(()) + }}; + } + + #[test] + fn no_avg() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![sum(col("b")), count(col("c"))])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b), count(test.c)]] [a:UInt32, sum(test.b):UInt64;N, count(test.c):Int64] + TableScan: test [a:UInt32, b:UInt32, c:UInt32] + " + ) + } + + #[test] + fn simple_avg() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![avg(col("b"))])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Projection: test.a, CAST(__decompose_0 AS Float64) / CAST(__decompose_1 AS Float64) AS avg(test.b) [a:UInt32, avg(test.b):Float64;N] + Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b) AS __decompose_0, count(Int64(1)) AS __decompose_1]] [a:UInt32, __decompose_0:UInt64;N, __decompose_1:Int64] + TableScan: test [a:UInt32, b:UInt32, c:UInt32] + " + ) + } + + #[test] + fn avg_with_other_aggregates() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate( + vec![col("a")], + vec![sum(col("b")), avg(col("c")), count(col("b"))], + )? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Projection: test.a, __decompose_0 AS sum(test.b), CAST(__decompose_1 AS Float64) / CAST(__decompose_2 AS Float64) AS avg(test.c), __decompose_3 AS count(test.b) [a:UInt32, sum(test.b):UInt64;N, avg(test.c):Float64;N, count(test.b):Int64] + Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b) AS __decompose_0, sum(test.c) AS __decompose_1, count(Int64(1)) AS __decompose_2, count(test.b) AS __decompose_3]] [a:UInt32, __decompose_0:UInt64;N, __decompose_1:UInt64;N, __decompose_2:Int64, __decompose_3:Int64] + TableScan: test [a:UInt32, b:UInt32, c:UInt32] + " + ) + } + + #[test] + fn avg_distinct_not_decomposed() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![avg_distinct(col("b"))])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Aggregate: groupBy=[[test.a]], aggr=[[avg(DISTINCT test.b)]] [a:UInt32, avg(DISTINCT test.b):Float64;N] + TableScan: test [a:UInt32, b:UInt32, c:UInt32] + " + ) + } + + #[test] + fn avg_with_filter_not_decomposed() -> Result<()> { + let table_scan = test_table_scan()?; + use datafusion_expr::ExprFunctionExt; + + let avg_filtered = avg_udaf() + .call(vec![col("b")]) + .filter(col("a").gt(lit(5))) + .build()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![avg_filtered])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Aggregate: groupBy=[[test.a]], aggr=[[avg(test.b) FILTER (WHERE test.a > Int32(5))]] [a:UInt32, avg(test.b) FILTER (WHERE test.a > Int32(5)):Float64;N] + TableScan: test [a:UInt32, b:UInt32, c:UInt32] + " + ) + } + + #[test] + fn multiple_avgs() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![avg(col("b")), avg(col("c"))])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Projection: test.a, CAST(__decompose_0 AS Float64) / CAST(__decompose_1 AS Float64) AS avg(test.b), CAST(__decompose_2 AS Float64) / CAST(__decompose_3 AS Float64) AS avg(test.c) [a:UInt32, avg(test.b):Float64;N, avg(test.c):Float64;N] + Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b) AS __decompose_0, count(Int64(1)) AS __decompose_1, sum(test.c) AS __decompose_2, count(Int64(1)) AS __decompose_3]] [a:UInt32, __decompose_0:UInt64;N, __decompose_1:Int64, __decompose_2:UInt64;N, __decompose_3:Int64] + TableScan: test [a:UInt32, b:UInt32, c:UInt32] + " + ) + } + + #[test] + fn no_group_by() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(Vec::::new(), vec![avg(col("b"))])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Projection: CAST(__decompose_0 AS Float64) / CAST(__decompose_1 AS Float64) AS avg(test.b) [avg(test.b):Float64;N] + Aggregate: groupBy=[[]], aggr=[[sum(test.b) AS __decompose_0, count(Int64(1)) AS __decompose_1]] [__decompose_0:UInt64;N, __decompose_1:Int64] + TableScan: test [a:UInt32, b:UInt32, c:UInt32] + " + ) + } + + #[test] + fn grouping_set_not_decomposed() -> Result<()> { + use datafusion_expr::expr::GroupingSet; + + let table_scan = test_table_scan()?; + + let rollup = Expr::GroupingSet(GroupingSet::Rollup(vec![col("a"), col("b")])); + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![rollup], vec![avg(col("c"))])? + .build()?; + + // ROLLUP aggregates should not be decomposed + assert_optimized_plan_equal!( + plan, + @r" + Aggregate: groupBy=[[ROLLUP (test.a, test.b)]], aggr=[[avg(test.c)]] [a:UInt32;N, b:UInt32;N, __grouping_id:UInt8, avg(test.c):Float64;N] + TableScan: test [a:UInt32, b:UInt32, c:UInt32] + " + ) + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index e610091824092..73dded9f7f143 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -40,6 +40,7 @@ //! [`TypeCoercion`]: analyzer::type_coercion::TypeCoercion pub mod analyzer; pub mod common_subexpr_eliminate; +pub mod decompose_aggregate; pub mod decorrelate; pub mod decorrelate_lateral_join; pub mod decorrelate_predicate_subquery; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index bdea6a83072cd..372da29abbac3 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -33,6 +33,7 @@ use datafusion_common::{DFSchema, DataFusionError, HashSet, Result, internal_err use datafusion_expr::logical_plan::LogicalPlan; use crate::common_subexpr_eliminate::CommonSubexprEliminate; +use crate::decompose_aggregate::DecomposeAggregate; use crate::decorrelate_lateral_join::DecorrelateLateralJoin; use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery; use crate::eliminate_cross_join::EliminateCrossJoin; @@ -297,6 +298,7 @@ impl Optimizer { // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit Arc::new(PushDownLimit::new()), Arc::new(PushDownFilter::new()), + Arc::new(DecomposeAggregate::new()), Arc::new(SingleDistinctToGroupBy::new()), // The previous optimizations added expressions and projections, // that might benefit from the following rules diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 02323671638c7..9dfcf09e3a271 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8587,14 +8587,16 @@ ORDER BY g; logical_plan 01)Sort: stream_test.g ASC NULLS LAST 02)--Projection: stream_test.g, count(Int64(1)) AS count(*), sum(stream_test.x), avg(stream_test.x), avg(stream_test.x) AS mean(stream_test.x), min(stream_test.x), max(stream_test.y), bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i), bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x), Int32(0) AS grouping(stream_test.g), var(stream_test.x), var(stream_test.x) AS var_samp(stream_test.x), var_pop(stream_test.x), var(stream_test.x) AS var_sample(stream_test.x), var_pop(stream_test.x) AS var_population(stream_test.x), stddev(stream_test.x), stddev(stream_test.x) AS stddev_samp(stream_test.x), stddev_pop(stream_test.x) -03)----Aggregate: groupBy=[[stream_test.g]], aggr=[[count(Int64(1)), sum(stream_test.x), avg(stream_test.x), min(stream_test.x), max(stream_test.y), bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i), bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x), var(stream_test.x), var_pop(stream_test.x), stddev(stream_test.x), stddev_pop(stream_test.x)]] -04)------Sort: stream_test.g ASC NULLS LAST, fetch=10000 -05)--------TableScan: stream_test projection=[g, x, y, i, b] +03)----Projection: stream_test.g, __decompose_0 AS count(Int64(1)), __common_expr_1 AS sum(stream_test.x), CAST(__common_expr_1 AS Float64) / CAST(__decompose_3 AS Float64) AS avg(stream_test.x), __decompose_4 AS min(stream_test.x), __decompose_5 AS max(stream_test.y), __decompose_6 AS bit_and(stream_test.i), __decompose_7 AS bit_or(stream_test.i), __decompose_8 AS bit_xor(stream_test.i), __decompose_9 AS bool_and(stream_test.b), __decompose_10 AS bool_or(stream_test.b), __decompose_11 AS median(stream_test.x), __decompose_12 AS var(stream_test.x), __decompose_13 AS var_pop(stream_test.x), __decompose_14 AS stddev(stream_test.x), __decompose_15 AS stddev_pop(stream_test.x) +04)------Aggregate: groupBy=[[stream_test.g]], aggr=[[sum(stream_test.x) AS __common_expr_1, count(Int64(1)) AS __decompose_0, count(stream_test.x) AS __decompose_3, min(stream_test.x) AS __decompose_4, max(stream_test.y) AS __decompose_5, bit_and(stream_test.i) AS __decompose_6, bit_or(stream_test.i) AS __decompose_7, bit_xor(stream_test.i) AS __decompose_8, bool_and(stream_test.b) AS __decompose_9, bool_or(stream_test.b) AS __decompose_10, median(stream_test.x) AS __decompose_11, var(stream_test.x) AS __decompose_12, var_pop(stream_test.x) AS __decompose_13, stddev(stream_test.x) AS __decompose_14, stddev_pop(stream_test.x) AS __decompose_15]] +05)--------Sort: stream_test.g ASC NULLS LAST, fetch=10000 +06)----------TableScan: stream_test projection=[g, x, y, i, b] physical_plan 01)ProjectionExec: expr=[g@0 as g, count(Int64(1))@1 as count(*), sum(stream_test.x)@2 as sum(stream_test.x), avg(stream_test.x)@3 as avg(stream_test.x), avg(stream_test.x)@3 as mean(stream_test.x), min(stream_test.x)@4 as min(stream_test.x), max(stream_test.y)@5 as max(stream_test.y), bit_and(stream_test.i)@6 as bit_and(stream_test.i), bit_or(stream_test.i)@7 as bit_or(stream_test.i), bit_xor(stream_test.i)@8 as bit_xor(stream_test.i), bool_and(stream_test.b)@9 as bool_and(stream_test.b), bool_or(stream_test.b)@10 as bool_or(stream_test.b), median(stream_test.x)@11 as median(stream_test.x), 0 as grouping(stream_test.g), var(stream_test.x)@12 as var(stream_test.x), var(stream_test.x)@12 as var_samp(stream_test.x), var_pop(stream_test.x)@13 as var_pop(stream_test.x), var(stream_test.x)@12 as var_sample(stream_test.x), var_pop(stream_test.x)@13 as var_population(stream_test.x), stddev(stream_test.x)@14 as stddev(stream_test.x), stddev(stream_test.x)@14 as stddev_samp(stream_test.x), stddev_pop(stream_test.x)@15 as stddev_pop(stream_test.x)] -02)--AggregateExec: mode=Single, gby=[g@0 as g], aggr=[count(Int64(1)), sum(stream_test.x), avg(stream_test.x), min(stream_test.x), max(stream_test.y), bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i), bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x), var(stream_test.x), var_pop(stream_test.x), stddev(stream_test.x), stddev_pop(stream_test.x)], ordering_mode=Sorted -03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST], preserve_partitioning=[false] -04)------DataSourceExec: partitions=1, partition_sizes=[1] +02)--ProjectionExec: expr=[g@0 as g, __decompose_0@2 as count(Int64(1)), __common_expr_1@1 as sum(stream_test.x), __common_expr_1@1 / CAST(__decompose_3@3 AS Float64) as avg(stream_test.x), __decompose_4@4 as min(stream_test.x), __decompose_5@5 as max(stream_test.y), __decompose_6@6 as bit_and(stream_test.i), __decompose_7@7 as bit_or(stream_test.i), __decompose_8@8 as bit_xor(stream_test.i), __decompose_9@9 as bool_and(stream_test.b), __decompose_10@10 as bool_or(stream_test.b), __decompose_11@11 as median(stream_test.x), __decompose_12@12 as var(stream_test.x), __decompose_13@13 as var_pop(stream_test.x), __decompose_14@14 as stddev(stream_test.x), __decompose_15@15 as stddev_pop(stream_test.x)] +03)----AggregateExec: mode=Single, gby=[g@0 as g], aggr=[__common_expr_1, __decompose_0, __decompose_3, __decompose_4, __decompose_5, __decompose_6, __decompose_7, __decompose_8, __decompose_9, __decompose_10, __decompose_11, __decompose_12, __decompose_13, __decompose_14, __decompose_15], ordering_mode=Sorted +04)------SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: partitions=1, partition_sizes=[1] query IIRRRRRIIIBBRIRRRRRRRR SELECT diff --git a/datafusion/sqllogictest/test_files/clickbench.slt b/datafusion/sqllogictest/test_files/clickbench.slt index 314c3f9736e90..a0d88e95858c4 100644 --- a/datafusion/sqllogictest/test_files/clickbench.slt +++ b/datafusion/sqllogictest/test_files/clickbench.slt @@ -101,14 +101,15 @@ query TT EXPLAIN SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits; ---- logical_plan -01)Projection: sum(hits.AdvEngineID), count(Int64(1)) AS count(*), avg(hits.ResolutionWidth) -02)--Aggregate: groupBy=[[]], aggr=[[sum(CAST(hits.AdvEngineID AS Int64)), count(Int64(1)), avg(CAST(hits.ResolutionWidth AS Float64))]] -03)----SubqueryAlias: hits -04)------TableScan: hits_raw projection=[ResolutionWidth, AdvEngineID] +01)Projection: __decompose_0 AS sum(hits.AdvEngineID), __decompose_1 AS count(*), CAST(__decompose_2 AS Float64) / CAST(__decompose_3 AS Float64) AS avg(hits.ResolutionWidth) +02)--Aggregate: groupBy=[[]], aggr=[[sum(CAST(hits.AdvEngineID AS Int64)) AS __decompose_0, count(Int64(1)) AS __decompose_1, sum(__common_expr_1) AS __decompose_2, count(__common_expr_1) AS __decompose_3]] +03)----Projection: CAST(hits.ResolutionWidth AS Float64) AS __common_expr_1, hits.AdvEngineID +04)------SubqueryAlias: hits +05)--------TableScan: hits_raw projection=[ResolutionWidth, AdvEngineID] physical_plan -01)ProjectionExec: expr=[sum(hits.AdvEngineID)@0 as sum(hits.AdvEngineID), count(Int64(1))@1 as count(*), avg(hits.ResolutionWidth)@2 as avg(hits.ResolutionWidth)] -02)--AggregateExec: mode=Single, gby=[], aggr=[sum(hits.AdvEngineID), count(Int64(1)), avg(hits.ResolutionWidth)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[ResolutionWidth, AdvEngineID], file_type=parquet +01)ProjectionExec: expr=[__decompose_0@0 as sum(hits.AdvEngineID), __decompose_1@1 as count(*), __decompose_2@2 / CAST(__decompose_3@3 AS Float64) as avg(hits.ResolutionWidth)] +02)--AggregateExec: mode=Single, gby=[], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[CAST(ResolutionWidth@20 AS Float64) as __common_expr_1, AdvEngineID], file_type=parquet query IIR SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits; @@ -120,17 +121,19 @@ query TT EXPLAIN SELECT AVG("UserID") FROM hits; ---- logical_plan -01)Aggregate: groupBy=[[]], aggr=[[avg(CAST(hits.UserID AS Float64))]] -02)--SubqueryAlias: hits -03)----TableScan: hits_raw projection=[UserID] +01)Projection: CAST(__decompose_0 AS Float64) / CAST(__decompose_1 AS Float64) AS avg(hits.UserID) +02)--Aggregate: groupBy=[[]], aggr=[[sum(hits.UserID) AS __decompose_0, count(CAST(hits.UserID AS Float64)) AS __decompose_1]] +03)----SubqueryAlias: hits +04)------TableScan: hits_raw projection=[UserID] physical_plan -01)AggregateExec: mode=Single, gby=[], aggr=[avg(hits.UserID)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[UserID], file_type=parquet +01)ProjectionExec: expr=[CAST(__decompose_0@0 AS Float64) / CAST(__decompose_1@1 AS Float64) as avg(hits.UserID)] +02)--AggregateExec: mode=Single, gby=[], aggr=[__decompose_0, __decompose_1] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[UserID], file_type=parquet query R SELECT AVG("UserID") FROM hits; ---- --304548765855551740 +-304548765855551600 ## Q4 query TT @@ -263,18 +266,19 @@ EXPLAIN SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWid ---- logical_plan 01)Sort: c DESC NULLS FIRST, fetch=10 -02)--Projection: hits.RegionID, sum(hits.AdvEngineID), count(Int64(1)) AS count(*) AS c, avg(hits.ResolutionWidth), count(DISTINCT hits.UserID) -03)----Aggregate: groupBy=[[hits.RegionID]], aggr=[[sum(CAST(hits.AdvEngineID AS Int64)), count(Int64(1)), avg(CAST(hits.ResolutionWidth AS Float64)), count(DISTINCT hits.UserID)]] -04)------SubqueryAlias: hits -05)--------TableScan: hits_raw projection=[RegionID, UserID, ResolutionWidth, AdvEngineID] +02)--Projection: hits.RegionID, __decompose_0 AS sum(hits.AdvEngineID), __decompose_1 AS c, CAST(__decompose_2 AS Float64) / CAST(__decompose_3 AS Float64) AS avg(hits.ResolutionWidth), __decompose_4 AS count(DISTINCT hits.UserID) +03)----Aggregate: groupBy=[[hits.RegionID]], aggr=[[sum(CAST(hits.AdvEngineID AS Int64)) AS __decompose_0, count(Int64(1)) AS __decompose_1, sum(__common_expr_1) AS __decompose_2, count(__common_expr_1) AS __decompose_3, count(DISTINCT hits.UserID) AS __decompose_4]] +04)------Projection: CAST(hits.ResolutionWidth AS Float64) AS __common_expr_1, hits.RegionID, hits.UserID, hits.AdvEngineID +05)--------SubqueryAlias: hits +06)----------TableScan: hits_raw projection=[RegionID, UserID, ResolutionWidth, AdvEngineID] physical_plan 01)SortPreservingMergeExec: [c@2 DESC], fetch=10 02)--SortExec: TopK(fetch=10), expr=[c@2 DESC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[RegionID@0 as RegionID, sum(hits.AdvEngineID)@1 as sum(hits.AdvEngineID), count(Int64(1))@2 as c, avg(hits.ResolutionWidth)@3 as avg(hits.ResolutionWidth), count(DISTINCT hits.UserID)@4 as count(DISTINCT hits.UserID)] -04)------AggregateExec: mode=FinalPartitioned, gby=[RegionID@0 as RegionID], aggr=[sum(hits.AdvEngineID), count(Int64(1)), avg(hits.ResolutionWidth), count(DISTINCT hits.UserID)] +03)----ProjectionExec: expr=[RegionID@0 as RegionID, __decompose_0@1 as sum(hits.AdvEngineID), __decompose_1@2 as c, __decompose_2@3 / CAST(__decompose_3@4 AS Float64) as avg(hits.ResolutionWidth), __decompose_4@5 as count(DISTINCT hits.UserID)] +04)------AggregateExec: mode=FinalPartitioned, gby=[RegionID@0 as RegionID], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3, __decompose_4] 05)--------RepartitionExec: partitioning=Hash([RegionID@0], 4), input_partitions=1 -06)----------AggregateExec: mode=Partial, gby=[RegionID@0 as RegionID], aggr=[sum(hits.AdvEngineID), count(Int64(1)), avg(hits.ResolutionWidth), count(DISTINCT hits.UserID)] -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[RegionID, UserID, ResolutionWidth, AdvEngineID], file_type=parquet +06)----------AggregateExec: mode=Partial, gby=[RegionID@1 as RegionID], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3, __decompose_4] +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[CAST(ResolutionWidth@20 AS Float64) as __common_expr_1, RegionID, UserID, AdvEngineID], file_type=parquet query IIIRI rowsort SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10; @@ -730,23 +734,25 @@ EXPLAIN SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHE ---- logical_plan 01)Sort: l DESC NULLS FIRST, fetch=25 -02)--Projection: hits.CounterID, avg(length(hits.URL)) AS l, count(Int64(1)) AS count(*) AS c -03)----Filter: count(Int64(1)) > Int64(100000) -04)------Aggregate: groupBy=[[hits.CounterID]], aggr=[[avg(CAST(character_length(hits.URL) AS length(hits.URL) AS Float64)), count(Int64(1))]] -05)--------SubqueryAlias: hits -06)----------Filter: hits_raw.URL != Utf8View("") -07)------------TableScan: hits_raw projection=[CounterID, URL], partial_filters=[hits_raw.URL != Utf8View("")] +02)--Projection: hits.CounterID, CAST(__decompose_0 AS Float64) / CAST(__decompose_1 AS Float64) AS l, __decompose_2 AS c +03)----Filter: __decompose_2 > Int64(100000) +04)------Aggregate: groupBy=[[hits.CounterID]], aggr=[[sum(__common_expr_1) AS __decompose_0, count(__common_expr_1) AS __decompose_1, count(Int64(1)) AS __decompose_2]] +05)--------Projection: CAST(character_length(hits.URL) AS length(hits.URL) AS Float64) AS __common_expr_1, hits.CounterID +06)----------SubqueryAlias: hits +07)------------Filter: hits_raw.URL != Utf8View("") +08)--------------TableScan: hits_raw projection=[CounterID, URL], partial_filters=[hits_raw.URL != Utf8View("")] physical_plan 01)SortPreservingMergeExec: [l@1 DESC], fetch=25 02)--SortExec: TopK(fetch=25), expr=[l@1 DESC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[CounterID@0 as CounterID, avg(length(hits.URL))@1 as l, count(Int64(1))@2 as c] -04)------FilterExec: count(Int64(1))@2 > 100000 -05)--------AggregateExec: mode=FinalPartitioned, gby=[CounterID@0 as CounterID], aggr=[avg(length(hits.URL)), count(Int64(1))] +03)----FilterExec: c@2 > 100000 +04)------ProjectionExec: expr=[CounterID@0 as CounterID, __decompose_0@1 / CAST(__decompose_1@2 AS Float64) as l, __decompose_2@3 as c] +05)--------AggregateExec: mode=FinalPartitioned, gby=[CounterID@0 as CounterID], aggr=[__decompose_0, __decompose_1, __decompose_2] 06)----------RepartitionExec: partitioning=Hash([CounterID@0], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[CounterID@0 as CounterID], aggr=[avg(length(hits.URL)), count(Int64(1))] -08)--------------FilterExec: URL@1 != -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[CounterID, URL], file_type=parquet, predicate=URL@13 != , pruning_predicate=URL_null_count@2 != row_count@3 AND (URL_min@0 != OR != URL_max@1), required_guarantees=[URL not in ()] +07)------------AggregateExec: mode=Partial, gby=[CounterID@1 as CounterID], aggr=[__decompose_0, __decompose_1, __decompose_2] +08)--------------ProjectionExec: expr=[CAST(character_length(URL@1) AS Float64) as __common_expr_1, CounterID@0 as CounterID] +09)----------------FilterExec: URL@1 != +10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[CounterID, URL], file_type=parquet, predicate=URL@13 != , pruning_predicate=URL_null_count@2 != row_count@3 AND (URL_min@0 != OR != URL_max@1), required_guarantees=[URL not in ()] query IRI SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE "URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; @@ -758,23 +764,25 @@ EXPLAIN SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1' ---- logical_plan 01)Sort: l DESC NULLS FIRST, fetch=25 -02)--Projection: regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1")) AS k, avg(length(hits.Referer)) AS l, count(Int64(1)) AS count(*) AS c, min(hits.Referer) -03)----Filter: count(Int64(1)) > Int64(100000) -04)------Aggregate: groupBy=[[regexp_replace(hits.Referer, Utf8View("^https?://(?:www\.)?([^/]+)/.*$"), Utf8View("\1")) AS regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))]], aggr=[[avg(CAST(character_length(hits.Referer) AS length(hits.Referer) AS Float64)), count(Int64(1)), min(hits.Referer)]] -05)--------SubqueryAlias: hits -06)----------Filter: hits_raw.Referer != Utf8View("") -07)------------TableScan: hits_raw projection=[Referer], partial_filters=[hits_raw.Referer != Utf8View("")] +02)--Projection: regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1")) AS k, CAST(__decompose_0 AS Float64) / CAST(__decompose_1 AS Float64) AS l, __decompose_2 AS c, __decompose_3 AS min(hits.Referer) +03)----Filter: __decompose_2 > Int64(100000) +04)------Aggregate: groupBy=[[regexp_replace(hits.Referer, Utf8View("^https?://(?:www\.)?([^/]+)/.*$"), Utf8View("\1")) AS regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))]], aggr=[[sum(__common_expr_1) AS __decompose_0, count(__common_expr_1) AS __decompose_1, count(Int64(1)) AS __decompose_2, min(hits.Referer) AS __decompose_3]] +05)--------Projection: CAST(character_length(hits.Referer) AS length(hits.Referer) AS Float64) AS __common_expr_1, hits.Referer +06)----------SubqueryAlias: hits +07)------------Filter: hits_raw.Referer != Utf8View("") +08)--------------TableScan: hits_raw projection=[Referer], partial_filters=[hits_raw.Referer != Utf8View("")] physical_plan 01)SortPreservingMergeExec: [l@1 DESC], fetch=25 02)--SortExec: TopK(fetch=25), expr=[l@1 DESC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))@0 as k, avg(length(hits.Referer))@1 as l, count(Int64(1))@2 as c, min(hits.Referer)@3 as min(hits.Referer)] -04)------FilterExec: count(Int64(1))@2 > 100000 -05)--------AggregateExec: mode=FinalPartitioned, gby=[regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))@0 as regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))], aggr=[avg(length(hits.Referer)), count(Int64(1)), min(hits.Referer)] +03)----FilterExec: c@2 > 100000 +04)------ProjectionExec: expr=[regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))@0 as k, __decompose_0@1 / CAST(__decompose_1@2 AS Float64) as l, __decompose_2@3 as c, __decompose_3@4 as min(hits.Referer)] +05)--------AggregateExec: mode=FinalPartitioned, gby=[regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))@0 as regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] 06)----------RepartitionExec: partitioning=Hash([regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))@0], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[regexp_replace(Referer@0, ^https?://(?:www\.)?([^/]+)/.*$, \1) as regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))], aggr=[avg(length(hits.Referer)), count(Int64(1)), min(hits.Referer)] -08)--------------FilterExec: Referer@0 != -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[Referer], file_type=parquet, predicate=Referer@14 != , pruning_predicate=Referer_null_count@2 != row_count@3 AND (Referer_min@0 != OR != Referer_max@1), required_guarantees=[Referer not in ()] +07)------------AggregateExec: mode=Partial, gby=[regexp_replace(Referer@1, ^https?://(?:www\.)?([^/]+)/.*$, \1) as regexp_replace(hits.Referer,Utf8("^https?://(?:www\.)?([^/]+)/.*$"),Utf8("\1"))], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] +08)--------------ProjectionExec: expr=[CAST(character_length(Referer@0) AS Float64) as __common_expr_1, Referer@0 as Referer] +09)----------------FilterExec: Referer@0 != +10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[Referer], file_type=parquet, predicate=Referer@14 != , pruning_predicate=Referer_null_count@2 != row_count@3 AND (Referer_min@0 != OR != Referer_max@1), required_guarantees=[Referer not in ()] query TRIT SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; @@ -807,22 +815,24 @@ EXPLAIN SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AV ---- logical_plan 01)Sort: c DESC NULLS FIRST, fetch=10 -02)--Projection: hits.SearchEngineID, hits.ClientIP, count(Int64(1)) AS count(*) AS c, sum(hits.IsRefresh), avg(hits.ResolutionWidth) -03)----Aggregate: groupBy=[[hits.SearchEngineID, hits.ClientIP]], aggr=[[count(Int64(1)), sum(CAST(hits.IsRefresh AS Int64)), avg(CAST(hits.ResolutionWidth AS Float64))]] -04)------SubqueryAlias: hits -05)--------Projection: hits_raw.ClientIP, hits_raw.IsRefresh, hits_raw.ResolutionWidth, hits_raw.SearchEngineID -06)----------Filter: hits_raw.SearchPhrase != Utf8View("") -07)------------TableScan: hits_raw projection=[ClientIP, IsRefresh, ResolutionWidth, SearchEngineID, SearchPhrase], partial_filters=[hits_raw.SearchPhrase != Utf8View("")] +02)--Projection: hits.SearchEngineID, hits.ClientIP, __decompose_0 AS c, __decompose_1 AS sum(hits.IsRefresh), CAST(__decompose_2 AS Float64) / CAST(__decompose_3 AS Float64) AS avg(hits.ResolutionWidth) +03)----Aggregate: groupBy=[[hits.SearchEngineID, hits.ClientIP]], aggr=[[count(Int64(1)) AS __decompose_0, sum(CAST(hits.IsRefresh AS Int64)) AS __decompose_1, sum(__common_expr_1) AS __decompose_2, count(__common_expr_1) AS __decompose_3]] +04)------Projection: CAST(hits.ResolutionWidth AS Float64) AS __common_expr_1, hits.ClientIP, hits.IsRefresh, hits.SearchEngineID +05)--------SubqueryAlias: hits +06)----------Projection: hits_raw.ClientIP, hits_raw.IsRefresh, hits_raw.ResolutionWidth, hits_raw.SearchEngineID +07)------------Filter: hits_raw.SearchPhrase != Utf8View("") +08)--------------TableScan: hits_raw projection=[ClientIP, IsRefresh, ResolutionWidth, SearchEngineID, SearchPhrase], partial_filters=[hits_raw.SearchPhrase != Utf8View("")] physical_plan 01)SortPreservingMergeExec: [c@2 DESC], fetch=10 02)--SortExec: TopK(fetch=10), expr=[c@2 DESC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[SearchEngineID@0 as SearchEngineID, ClientIP@1 as ClientIP, count(Int64(1))@2 as c, sum(hits.IsRefresh)@3 as sum(hits.IsRefresh), avg(hits.ResolutionWidth)@4 as avg(hits.ResolutionWidth)] -04)------AggregateExec: mode=FinalPartitioned, gby=[SearchEngineID@0 as SearchEngineID, ClientIP@1 as ClientIP], aggr=[count(Int64(1)), sum(hits.IsRefresh), avg(hits.ResolutionWidth)] +03)----ProjectionExec: expr=[SearchEngineID@0 as SearchEngineID, ClientIP@1 as ClientIP, __decompose_0@2 as c, __decompose_1@3 as sum(hits.IsRefresh), __decompose_2@4 / CAST(__decompose_3@5 AS Float64) as avg(hits.ResolutionWidth)] +04)------AggregateExec: mode=FinalPartitioned, gby=[SearchEngineID@0 as SearchEngineID, ClientIP@1 as ClientIP], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] 05)--------RepartitionExec: partitioning=Hash([SearchEngineID@0, ClientIP@1], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[SearchEngineID@3 as SearchEngineID, ClientIP@0 as ClientIP], aggr=[count(Int64(1)), sum(hits.IsRefresh), avg(hits.ResolutionWidth)] -07)------------FilterExec: SearchPhrase@4 != , projection=[ClientIP@0, IsRefresh@1, ResolutionWidth@2, SearchEngineID@3] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[ClientIP, IsRefresh, ResolutionWidth, SearchEngineID, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != , pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()] +06)----------AggregateExec: mode=Partial, gby=[SearchEngineID@3 as SearchEngineID, ClientIP@1 as ClientIP], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] +07)------------ProjectionExec: expr=[CAST(ResolutionWidth@0 AS Float64) as __common_expr_1, ClientIP@1 as ClientIP, IsRefresh@2 as IsRefresh, SearchEngineID@3 as SearchEngineID] +08)--------------FilterExec: SearchPhrase@4 != , projection=[ResolutionWidth@2, ClientIP@0, IsRefresh@1, SearchEngineID@3] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[ClientIP, IsRefresh, ResolutionWidth, SearchEngineID, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != , pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()] query IIIIR SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "ClientIP" ORDER BY c DESC LIMIT 10; @@ -834,22 +844,24 @@ EXPLAIN SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("Reso ---- logical_plan 01)Sort: c DESC NULLS FIRST, fetch=10 -02)--Projection: hits.WatchID, hits.ClientIP, count(Int64(1)) AS count(*) AS c, sum(hits.IsRefresh), avg(hits.ResolutionWidth) -03)----Aggregate: groupBy=[[hits.WatchID, hits.ClientIP]], aggr=[[count(Int64(1)), sum(CAST(hits.IsRefresh AS Int64)), avg(CAST(hits.ResolutionWidth AS Float64))]] -04)------SubqueryAlias: hits -05)--------Projection: hits_raw.WatchID, hits_raw.ClientIP, hits_raw.IsRefresh, hits_raw.ResolutionWidth -06)----------Filter: hits_raw.SearchPhrase != Utf8View("") -07)------------TableScan: hits_raw projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth, SearchPhrase], partial_filters=[hits_raw.SearchPhrase != Utf8View("")] +02)--Projection: hits.WatchID, hits.ClientIP, __decompose_0 AS c, __decompose_1 AS sum(hits.IsRefresh), CAST(__decompose_2 AS Float64) / CAST(__decompose_3 AS Float64) AS avg(hits.ResolutionWidth) +03)----Aggregate: groupBy=[[hits.WatchID, hits.ClientIP]], aggr=[[count(Int64(1)) AS __decompose_0, sum(CAST(hits.IsRefresh AS Int64)) AS __decompose_1, sum(__common_expr_1) AS __decompose_2, count(__common_expr_1) AS __decompose_3]] +04)------Projection: CAST(hits.ResolutionWidth AS Float64) AS __common_expr_1, hits.WatchID, hits.ClientIP, hits.IsRefresh +05)--------SubqueryAlias: hits +06)----------Projection: hits_raw.WatchID, hits_raw.ClientIP, hits_raw.IsRefresh, hits_raw.ResolutionWidth +07)------------Filter: hits_raw.SearchPhrase != Utf8View("") +08)--------------TableScan: hits_raw projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth, SearchPhrase], partial_filters=[hits_raw.SearchPhrase != Utf8View("")] physical_plan 01)SortPreservingMergeExec: [c@2 DESC], fetch=10 02)--SortExec: TopK(fetch=10), expr=[c@2 DESC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, count(Int64(1))@2 as c, sum(hits.IsRefresh)@3 as sum(hits.IsRefresh), avg(hits.ResolutionWidth)@4 as avg(hits.ResolutionWidth)] -04)------AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[count(Int64(1)), sum(hits.IsRefresh), avg(hits.ResolutionWidth)] +03)----ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, __decompose_0@2 as c, __decompose_1@3 as sum(hits.IsRefresh), __decompose_2@4 / CAST(__decompose_3@5 AS Float64) as avg(hits.ResolutionWidth)] +04)------AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] 05)--------RepartitionExec: partitioning=Hash([WatchID@0, ClientIP@1], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[count(Int64(1)), sum(hits.IsRefresh), avg(hits.ResolutionWidth)] -07)------------FilterExec: SearchPhrase@4 != , projection=[WatchID@0, ClientIP@1, IsRefresh@2, ResolutionWidth@3] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != , pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()] +06)----------AggregateExec: mode=Partial, gby=[WatchID@1 as WatchID, ClientIP@2 as ClientIP], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] +07)------------ProjectionExec: expr=[CAST(ResolutionWidth@0 AS Float64) as __common_expr_1, WatchID@1 as WatchID, ClientIP@2 as ClientIP, IsRefresh@3 as IsRefresh] +08)--------------FilterExec: SearchPhrase@4 != , projection=[ResolutionWidth@3, WatchID@0, ClientIP@1, IsRefresh@2] +09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +10)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != , pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()] query IIIIR SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; @@ -861,18 +873,19 @@ EXPLAIN SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("Reso ---- logical_plan 01)Sort: c DESC NULLS FIRST, fetch=10 -02)--Projection: hits.WatchID, hits.ClientIP, count(Int64(1)) AS count(*) AS c, sum(hits.IsRefresh), avg(hits.ResolutionWidth) -03)----Aggregate: groupBy=[[hits.WatchID, hits.ClientIP]], aggr=[[count(Int64(1)), sum(CAST(hits.IsRefresh AS Int64)), avg(CAST(hits.ResolutionWidth AS Float64))]] -04)------SubqueryAlias: hits -05)--------TableScan: hits_raw projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth] +02)--Projection: hits.WatchID, hits.ClientIP, __decompose_0 AS c, __decompose_1 AS sum(hits.IsRefresh), CAST(__decompose_2 AS Float64) / CAST(__decompose_3 AS Float64) AS avg(hits.ResolutionWidth) +03)----Aggregate: groupBy=[[hits.WatchID, hits.ClientIP]], aggr=[[count(Int64(1)) AS __decompose_0, sum(CAST(hits.IsRefresh AS Int64)) AS __decompose_1, sum(__common_expr_1) AS __decompose_2, count(__common_expr_1) AS __decompose_3]] +04)------Projection: CAST(hits.ResolutionWidth AS Float64) AS __common_expr_1, hits.WatchID, hits.ClientIP, hits.IsRefresh +05)--------SubqueryAlias: hits +06)----------TableScan: hits_raw projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth] physical_plan 01)SortPreservingMergeExec: [c@2 DESC], fetch=10 02)--SortExec: TopK(fetch=10), expr=[c@2 DESC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, count(Int64(1))@2 as c, sum(hits.IsRefresh)@3 as sum(hits.IsRefresh), avg(hits.ResolutionWidth)@4 as avg(hits.ResolutionWidth)] -04)------AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[count(Int64(1)), sum(hits.IsRefresh), avg(hits.ResolutionWidth)] +03)----ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, __decompose_0@2 as c, __decompose_1@3 as sum(hits.IsRefresh), __decompose_2@4 / CAST(__decompose_3@5 AS Float64) as avg(hits.ResolutionWidth)] +04)------AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] 05)--------RepartitionExec: partitioning=Hash([WatchID@0, ClientIP@1], 4), input_partitions=1 -06)----------AggregateExec: mode=Partial, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[count(Int64(1)), sum(hits.IsRefresh), avg(hits.ResolutionWidth)] -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth], file_type=parquet +06)----------AggregateExec: mode=Partial, gby=[WatchID@1 as WatchID, ClientIP@2 as ClientIP], aggr=[__decompose_0, __decompose_1, __decompose_2, __decompose_3] +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[CAST(ResolutionWidth@20 AS Float64) as __common_expr_1, WatchID, ClientIP, IsRefresh], file_type=parquet query IIIIR rowsort SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 467afe7b6c2ba..9329ad2860a02 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -194,6 +194,7 @@ logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE +logical_plan after decompose_aggregate SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE @@ -218,6 +219,7 @@ logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE +logical_plan after decompose_aggregate SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE @@ -566,6 +568,7 @@ logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE +logical_plan after decompose_aggregate SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE @@ -590,6 +593,7 @@ logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE +logical_plan after decompose_aggregate SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE @@ -647,11 +651,11 @@ logical_plan 02)--{ 03)----"Plan": { 04)------"Node Type": "Values", -05)------"Values": "(Int64(1))", -06)------"Plans": [], -07)------"Output": [ -08)--------"column1" -09)------] +05)------"Output": [ +06)--------"column1" +07)------], +08)------"Plans": [], +09)------"Values": "(Int64(1))" 10)----} 11)--} 12)] diff --git a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt index da1e7de22bb7a..a3745e19376ea 100644 --- a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt +++ b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt @@ -60,8 +60,8 @@ FROM test_table t group by 1, 2, 3 ---- logical_plan -01)Projection: Int64(123), Int64(456), Int64(789), count(Int64(1)), avg(t.c12) -02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)), avg(t.c12)]] +01)Projection: Int64(123), Int64(456), Int64(789), __common_expr_1 AS count(Int64(1)), CAST(__decompose_1 AS Float64) / CAST(__common_expr_1 AS Float64) AS avg(t.c12) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS __common_expr_1, sum(t.c12) AS __decompose_1]] 03)----SubqueryAlias: t 04)------TableScan: test_table projection=[c12] diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 175d7d90cd8ed..03553e135e63b 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -281,15 +281,15 @@ EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_d ---- logical_plan 01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST -02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*), avg(fact_table_ordered.value) -03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]], aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]] +02)--Projection: fact_table_ordered.f_dkey, __decompose_0 AS count(*), CAST(__decompose_1 AS Float64) / CAST(__decompose_2 AS Float64) AS avg(fact_table_ordered.value) +03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]], aggr=[[count(Int64(1)) AS __decompose_0, sum(fact_table_ordered.value) AS __decompose_1, count(fact_table_ordered.value) AS __decompose_2]] 04)------TableScan: fact_table_ordered projection=[value, f_dkey] physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)] -03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted +02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, __decompose_0@1 as count(*), __decompose_1@2 / CAST(__decompose_2@3 AS Float64) as avg(fact_table_ordered.value)] +03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[__decompose_0, __decompose_1, __decompose_2], ordering_mode=Sorted 04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST -05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted +05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[__decompose_0, __decompose_1, __decompose_2], ordering_mode=Sorted 06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet # Verify results without optimization @@ -313,13 +313,13 @@ EXPLAIN SELECT f_dkey, count(*), avg(value) FROM fact_table_ordered GROUP BY f_d ---- logical_plan 01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST -02)--Projection: fact_table_ordered.f_dkey, count(Int64(1)) AS count(*), avg(fact_table_ordered.value) -03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]], aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]] +02)--Projection: fact_table_ordered.f_dkey, __decompose_0 AS count(*), CAST(__decompose_1 AS Float64) / CAST(__decompose_2 AS Float64) AS avg(fact_table_ordered.value) +03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey]], aggr=[[count(Int64(1)) AS __decompose_0, sum(fact_table_ordered.value) AS __decompose_1, count(fact_table_ordered.value) AS __decompose_2]] 04)------TableScan: fact_table_ordered projection=[value, f_dkey] physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST] -02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)] -03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted +02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, __decompose_0@1 as count(*), __decompose_1@2 / CAST(__decompose_2@3 AS Float64) as avg(fact_table_ordered.value)] +03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], aggr=[__decompose_0, __decompose_1, __decompose_2], ordering_mode=Sorted 04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet query TIR @@ -716,13 +716,14 @@ FROM fact_table GROUP BY f_dkey, timestamp; ---- logical_plan -01)Projection: fact_table.f_dkey, fact_table.timestamp, count(Int64(1)) AS count(*), avg(fact_table.value) -02)--Aggregate: groupBy=[[fact_table.f_dkey, fact_table.timestamp]], aggr=[[count(Int64(1)), avg(fact_table.value)]] +01)Projection: fact_table.f_dkey, fact_table.timestamp, __decompose_0 AS count(*), CAST(__decompose_1 AS Float64) / CAST(__decompose_2 AS Float64) AS avg(fact_table.value) +02)--Aggregate: groupBy=[[fact_table.f_dkey, fact_table.timestamp]], aggr=[[count(Int64(1)) AS __decompose_0, sum(fact_table.value) AS __decompose_1, count(fact_table.value) AS __decompose_2]] 03)----TableScan: fact_table projection=[timestamp, value, f_dkey] physical_plan -01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp, count(Int64(1))@2 as count(*), avg(fact_table.value)@3 as avg(fact_table.value)] -02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, timestamp@0 as timestamp], aggr=[count(Int64(1)), avg(fact_table.value)] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet +01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp, __decompose_0@2 as count(*), __decompose_1@3 / CAST(__decompose_2@4 AS Float64) as avg(fact_table.value)] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 +03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, timestamp@0 as timestamp], aggr=[__decompose_0, __decompose_1, __decompose_2] +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet query TPIR rowsort SELECT f_dkey, timestamp, diff --git a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt index dbf31dec5e118..5d77a041728f6 100644 --- a/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt +++ b/datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt @@ -155,15 +155,15 @@ ORDER BY f_dkey, time_bin; ---- logical_plan 01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST, time_bin ASC NULLS LAST -02)--Projection: fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin, count(Int64(1)) AS count(*), avg(fact_table_ordered.value) -03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]], aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]] +02)--Projection: fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin, __decompose_0 AS count(*), CAST(__decompose_1 AS Float64) / CAST(__decompose_2 AS Float64) AS avg(fact_table_ordered.value) +03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]], aggr=[[count(Int64(1)) AS __decompose_0, sum(fact_table_ordered.value) AS __decompose_1, count(fact_table_ordered.value) AS __decompose_2]] 04)------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey] physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST] -02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as avg(fact_table_ordered.value)] -03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted +02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, __decompose_0@2 as count(*), __decompose_1@3 / CAST(__decompose_2@4 AS Float64) as avg(fact_table_ordered.value)] +03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[__decompose_0, __decompose_1, __decompose_2], ordering_mode=Sorted 04)------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 ASC NULLS LAST -05)--------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted +05)--------AggregateExec: mode=Partial, gby=[f_dkey@2 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[__decompose_0, __decompose_1, __decompose_2], ordering_mode=Sorted 06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet # Verify results without subset satisfaction @@ -197,13 +197,13 @@ ORDER BY f_dkey, time_bin; ---- logical_plan 01)Sort: fact_table_ordered.f_dkey ASC NULLS LAST, time_bin ASC NULLS LAST -02)--Projection: fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin, count(Int64(1)) AS count(*), avg(fact_table_ordered.value) -03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]], aggr=[[count(Int64(1)), avg(fact_table_ordered.value)]] +02)--Projection: fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp) AS time_bin, __decompose_0 AS count(*), CAST(__decompose_1 AS Float64) / CAST(__decompose_2 AS Float64) AS avg(fact_table_ordered.value) +03)----Aggregate: groupBy=[[fact_table_ordered.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"), fact_table_ordered.timestamp)]], aggr=[[count(Int64(1)) AS __decompose_0, sum(fact_table_ordered.value) AS __decompose_1, count(fact_table_ordered.value) AS __decompose_2]] 04)------TableScan: fact_table_ordered projection=[timestamp, value, f_dkey] physical_plan 01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST] -02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, count(Int64(1))@2 as count(*), avg(fact_table_ordered.value)@3 as avg(fact_table_ordered.value)] -03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted +02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)@1 as time_bin, __decompose_0@2 as count(*), __decompose_1@3 / CAST(__decompose_2@4 AS Float64) as avg(fact_table_ordered.value)] +03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@0) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),fact_table_ordered.timestamp)], aggr=[__decompose_0, __decompose_1, __decompose_2], ordering_mode=Sorted 04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_subset_satisfaction/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet # Verify results match with subset satisfaction @@ -351,8 +351,8 @@ ORDER BY env, time_bin; ---- logical_plan 01)Sort: a.env ASC NULLS LAST, a.time_bin ASC NULLS LAST -02)--Projection: a.env, a.time_bin, avg(a.max_bin_value) AS avg_max_value -03)----Aggregate: groupBy=[[a.env, a.time_bin]], aggr=[[avg(a.max_bin_value)]] +02)--Projection: a.env, a.time_bin, CAST(__decompose_0 AS Float64) / CAST(__decompose_1 AS Float64) AS avg_max_value +03)----Aggregate: groupBy=[[a.env, a.time_bin]], aggr=[[sum(a.max_bin_value) AS __decompose_0, count(a.max_bin_value) AS __decompose_1]] 04)------SubqueryAlias: a 05)--------Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp) AS time_bin, j.env, max(j.value) AS max_bin_value 06)----------Aggregate: groupBy=[[j.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"), j.timestamp), j.env]], aggr=[[max(j.value)]] @@ -368,10 +368,10 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST] 02)--SortExec: expr=[env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[env@0 as env, time_bin@1 as time_bin, avg(a.max_bin_value)@2 as avg_max_value] -04)------AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as time_bin], aggr=[avg(a.max_bin_value)] +03)----ProjectionExec: expr=[env@0 as env, time_bin@1 as time_bin, __decompose_0@2 / CAST(__decompose_1@3 AS Float64) as avg_max_value] +04)------AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as time_bin], aggr=[__decompose_0, __decompose_1] 05)--------RepartitionExec: partitioning=Hash([env@0, time_bin@1], 3), input_partitions=3 -06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[avg(a.max_bin_value)] +06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[__decompose_0, __decompose_1] 07)------------ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env, max(j.value)@3 as max_bin_value] 08)--------------AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@2 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1]) 09)----------------RepartitionExec: partitioning=Hash([f_dkey@0, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1, env@2], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 ASC NULLS LAST @@ -448,8 +448,8 @@ ORDER BY env, time_bin; ---- logical_plan 01)Sort: a.env ASC NULLS LAST, a.time_bin ASC NULLS LAST -02)--Projection: a.env, a.time_bin, avg(a.max_bin_value) AS avg_max_value -03)----Aggregate: groupBy=[[a.env, a.time_bin]], aggr=[[avg(a.max_bin_value)]] +02)--Projection: a.env, a.time_bin, CAST(__decompose_0 AS Float64) / CAST(__decompose_1 AS Float64) AS avg_max_value +03)----Aggregate: groupBy=[[a.env, a.time_bin]], aggr=[[sum(a.max_bin_value) AS __decompose_0, count(a.max_bin_value) AS __decompose_1]] 04)------SubqueryAlias: a 05)--------Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp) AS time_bin, j.env, max(j.value) AS max_bin_value 06)----------Aggregate: groupBy=[[j.f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"), j.timestamp), j.env]], aggr=[[max(j.value)]] @@ -465,10 +465,10 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST] 02)--SortExec: expr=[env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[env@0 as env, time_bin@1 as time_bin, avg(a.max_bin_value)@2 as avg_max_value] -04)------AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as time_bin], aggr=[avg(a.max_bin_value)] +03)----ProjectionExec: expr=[env@0 as env, time_bin@1 as time_bin, __decompose_0@2 / CAST(__decompose_1@3 AS Float64) as avg_max_value] +04)------AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as time_bin], aggr=[__decompose_0, __decompose_1] 05)--------RepartitionExec: partitioning=Hash([env@0, time_bin@1], 3), input_partitions=3 -06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[avg(a.max_bin_value)] +06)----------AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[__decompose_0, __decompose_1] 07)------------ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env, max(j.value)@3 as max_bin_value] 08)--------------AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1]) 09)----------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[f_dkey@4, env@0, timestamp@2, value@3]