diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 414217612d1e..be76c069f0b7 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -22,12 +22,13 @@ use std::sync::Arc; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::Result; +use datafusion_common::{DFSchema, Result}; use datafusion_expr::{ col, expr::AggregateFunction, - logical_plan::{Aggregate, LogicalPlan}, - Expr, + logical_plan::{Aggregate, LogicalPlan, Projection}, + utils::columnize_expr, + Expr, ExprSchemable, }; use hashbrown::HashSet; @@ -152,7 +153,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { // replace the distinct arg with alias let mut group_fields_set = HashSet::new(); - let outer_aggr_exprs = aggr_expr + let new_aggr_exprs = aggr_expr .iter() .map(|aggr_expr| match aggr_expr { Expr::AggregateFunction(AggregateFunction { @@ -174,24 +175,67 @@ impl OptimizerRule for SingleDistinctToGroupBy { false, // intentional to remove distinct here filter.clone(), order_by.clone(), - )) - .alias(aggr_expr.display_name()?)) + ))) } _ => Ok(aggr_expr.clone()), }) .collect::>>()?; // construct the inner AggrPlan + let inner_fields = inner_group_exprs + .iter() + .map(|expr| expr.to_field(input.schema())) + .collect::>>()?; + let inner_schema = DFSchema::new_with_metadata( + inner_fields, + input.schema().metadata().clone(), + )?; let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new( input.clone(), inner_group_exprs, Vec::new(), )?); - Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new( + let outer_fields = outer_group_exprs + .iter() + .chain(new_aggr_exprs.iter()) + .map(|expr| expr.to_field(&inner_schema)) + .collect::>>()?; + let outer_aggr_schema = Arc::new(DFSchema::new_with_metadata( + outer_fields, + input.schema().metadata().clone(), + )?); + + // so the aggregates are displayed in the same way even after the rewrite + // this optimizer has two kinds of alias: + // - group_by aggr + // - aggr expr + let group_size = group_expr.len(); + let alias_expr = out_group_expr_with_alias + .into_iter() + .map(|(group_expr, original_field)| { + if let Some(name) = original_field { + group_expr.alias(name) + } else { + group_expr + } + }) + .chain(new_aggr_exprs.iter().enumerate().map(|(idx, expr)| { + let idx = idx + group_size; + let name = fields[idx].qualified_name(); + columnize_expr(expr.clone().alias(name), &outer_aggr_schema) + })) + .collect(); + + let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new( Arc::new(inner_agg), outer_group_exprs, - outer_aggr_exprs, + new_aggr_exprs, + )?); + + Ok(Some(LogicalPlan::Projection(Projection::try_new( + alias_expr, + Arc::new(outer_aggr), )?))) } else { Ok(None) @@ -255,9 +299,10 @@ mod tests { .build()?; // Should work - let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b)]] [COUNT(DISTINCT test.b):Int64;N]\ - \n Aggregate: groupBy=[[test.b AS alias1]], aggr=[[]] [alias1:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Projection: COUNT(alias1) AS COUNT(DISTINCT test.b) [COUNT(DISTINCT test.b):Int64;N]\ + \n Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]] [COUNT(alias1):Int64;N]\ + \n Aggregate: groupBy=[[test.b AS alias1]], aggr=[[]] [alias1:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } @@ -328,9 +373,10 @@ mod tests { .aggregate(Vec::::new(), vec![count_distinct(lit(2) * col("b"))])? .build()?; - let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT Int32(2) * test.b)]] [COUNT(DISTINCT Int32(2) * test.b):Int64;N]\ - \n Aggregate: groupBy=[[Int32(2) * test.b AS alias1]], aggr=[[]] [alias1:Int32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Projection: COUNT(alias1) AS COUNT(DISTINCT Int32(2) * test.b) [COUNT(DISTINCT Int32(2) * test.b):Int64;N]\ + \n Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]] [COUNT(alias1):Int64;N]\ + \n Aggregate: groupBy=[[Int32(2) * test.b AS alias1]], aggr=[[]] [alias1:Int32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } @@ -344,9 +390,10 @@ mod tests { .build()?; // Should work - let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\ - \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Projection: test.a, COUNT(alias1) AS COUNT(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\ + \n Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1)]] [a:UInt32, COUNT(alias1):Int64;N]\ + \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } @@ -389,9 +436,10 @@ mod tests { )? .build()?; // Should work - let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\ - \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Projection: test.a, COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\ + \n Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1), MAX(alias1)]] [a:UInt32, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\ + \n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } @@ -423,9 +471,10 @@ mod tests { .build()?; // Should work - let expected = "Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.c)]] [group_alias_0:Int32, COUNT(DISTINCT test.c):Int64;N]\ - \n Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ - \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; + let expected = "Projection: group_alias_0 AS test.a + Int32(1), COUNT(alias1) AS COUNT(DISTINCT test.c) [test.a + Int32(1):Int32, COUNT(DISTINCT test.c):Int64;N]\ + \n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\ + \n Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ + \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; assert_optimized_plan_equal(&plan, expected) } diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 105f11f21628..300e92a7352f 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3823,17 +3823,17 @@ query TT EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y; ---- logical_plan -Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x) ---Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x)]] +Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x) +--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]] ----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS alias1]], aggr=[[]] ------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y --------TableScan: t1 projection=[x, y] physical_plan -ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)] ---AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] +ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)] +--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] ----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8 ---------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)] +--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)] ----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[] ------------CoalesceBatchesExec: target_batch_size=2 --------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=8 @@ -3841,3 +3841,26 @@ ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t ------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS Float64)t1.x@0 as alias1], aggr=[] --------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y] ----------------------MemoryExec: partitions=1, partition_sizes=[1] + +statement ok +drop table t1 + +# Reproducer for https://github.com/apache/arrow-datafusion/issues/8175 + +statement ok +create table t1(state string, city string, min_temp float, area int, time timestamp) as values + ('MA', 'Boston', 70.4, 1, 50), + ('MA', 'Bedford', 71.59, 2, 150); + +query RI +select date_part('year', time) as bla, count(distinct state) as count from t1 group by bla; +---- +1970 1 + +query PI +select date_bin(interval '1 year', time) as bla, count(distinct state) as count from t1 group by bla; +---- +1970-01-01T00:00:00 1 + +statement ok +drop table t1 diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 24893297f163..fa3a6cff8c4a 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1361,29 +1361,31 @@ from join_t1 inner join join_t2 on join_t1.t1_id = join_t2.t2_id ---- logical_plan -Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT join_t1.t1_id)]] ---Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]] -----Projection: join_t1.t1_id -------Inner Join: join_t1.t1_id = join_t2.t2_id ---------TableScan: join_t1 projection=[t1_id] ---------TableScan: join_t2 projection=[t2_id] +Projection: COUNT(alias1) AS COUNT(DISTINCT join_t1.t1_id) +--Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]] +----Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]] +------Projection: join_t1.t1_id +--------Inner Join: join_t1.t1_id = join_t2.t2_id +----------TableScan: join_t1 projection=[t1_id] +----------TableScan: join_t2 projection=[t2_id] physical_plan -AggregateExec: mode=Final, gby=[], aggr=[COUNT(DISTINCT join_t1.t1_id)] ---CoalescePartitionsExec -----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(DISTINCT join_t1.t1_id)] -------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[] ---------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[] -----------ProjectionExec: expr=[t1_id@0 as t1_id] -------------CoalesceBatchesExec: target_batch_size=2 ---------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)] -----------------CoalesceBatchesExec: target_batch_size=2 -------------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ---------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -----------------------MemoryExec: partitions=1, partition_sizes=[1] -----------------CoalesceBatchesExec: target_batch_size=2 -------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ---------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -----------------------MemoryExec: partitions=1, partition_sizes=[1] +ProjectionExec: expr=[COUNT(alias1)@0 as COUNT(DISTINCT join_t1.t1_id)] +--AggregateExec: mode=Final, gby=[], aggr=[COUNT(alias1)] +----CoalescePartitionsExec +------AggregateExec: mode=Partial, gby=[], aggr=[COUNT(alias1)] +--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[] +----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[] +------------ProjectionExec: expr=[t1_id@0 as t1_id] +--------------CoalesceBatchesExec: target_batch_size=2 +----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)] +------------------CoalesceBatchesExec: target_batch_size=2 +--------------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 +----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[1] +------------------CoalesceBatchesExec: target_batch_size=2 +--------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------------------MemoryExec: partitions=1, partition_sizes=[1] statement ok set datafusion.explain.logical_plan_only = true; @@ -3407,3 +3409,4 @@ set datafusion.optimizer.prefer_existing_sort = false; statement ok drop table annotated_data; + diff --git a/datafusion/sqllogictest/test_files/tpch/q16.slt.part b/datafusion/sqllogictest/test_files/tpch/q16.slt.part index c04782958917..b93872929fe5 100644 --- a/datafusion/sqllogictest/test_files/tpch/q16.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q16.slt.part @@ -52,8 +52,8 @@ limit 10; logical_plan Limit: skip=0, fetch=10 --Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type ASC NULLS LAST, part.p_size ASC NULLS LAST, fetch=10 -----Projection: part.p_brand, part.p_type, part.p_size, COUNT(DISTINCT partsupp.ps_suppkey) AS supplier_cnt -------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)]] +----Projection: part.p_brand, part.p_type, part.p_size, COUNT(alias1) AS supplier_cnt +------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]], aggr=[[COUNT(alias1)]] --------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey AS alias1]], aggr=[[]] ----------LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey ------------Projection: partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size @@ -69,11 +69,11 @@ physical_plan GlobalLimitExec: skip=0, fetch=10 --SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST], fetch=10 ----SortExec: TopK(fetch=10), expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST] -------ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size, COUNT(DISTINCT partsupp.ps_suppkey)@3 as supplier_cnt] ---------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(DISTINCT partsupp.ps_suppkey)] +------ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size, COUNT(alias1)@3 as supplier_cnt] +--------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(alias1)] ----------CoalesceBatchesExec: target_batch_size=8192 ------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2], 4), input_partitions=4 ---------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(DISTINCT partsupp.ps_suppkey)] +--------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(alias1)] ----------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[] ------------------CoalesceBatchesExec: target_batch_size=8192 --------------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2, alias1@3], 4), input_partitions=4