Skip to content

Commit

Permalink
Revert "Minor: remove unnecessary projection in `single_distinct_to_g… (
Browse files Browse the repository at this point in the history
apache#8176)

* Revert "Minor: remove unnecessary projection in `single_distinct_to_group_by` rule (apache#8061)"

This reverts commit 15d8c9b.

* Add regression test

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
NGA-TRAN and alamb authored Nov 14, 2023
1 parent f390f15 commit abb2ae7
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 55 deletions.
95 changes: 72 additions & 23 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use std::sync::Arc;
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::Result;
use datafusion_common::{DFSchema, Result};
use datafusion_expr::{
col,
expr::AggregateFunction,
logical_plan::{Aggregate, LogicalPlan},
Expr,
logical_plan::{Aggregate, LogicalPlan, Projection},
utils::columnize_expr,
Expr, ExprSchemable,
};

use hashbrown::HashSet;
Expand Down Expand Up @@ -152,7 +153,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {

// replace the distinct arg with alias
let mut group_fields_set = HashSet::new();
let outer_aggr_exprs = aggr_expr
let new_aggr_exprs = aggr_expr
.iter()
.map(|aggr_expr| match aggr_expr {
Expr::AggregateFunction(AggregateFunction {
Expand All @@ -174,24 +175,67 @@ impl OptimizerRule for SingleDistinctToGroupBy {
false, // intentional to remove distinct here
filter.clone(),
order_by.clone(),
))
.alias(aggr_expr.display_name()?))
)))
}
_ => Ok(aggr_expr.clone()),
})
.collect::<Result<Vec<_>>>()?;

// construct the inner AggrPlan
let inner_fields = inner_group_exprs
.iter()
.map(|expr| expr.to_field(input.schema()))
.collect::<Result<Vec<_>>>()?;
let inner_schema = DFSchema::new_with_metadata(
inner_fields,
input.schema().metadata().clone(),
)?;
let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
inner_group_exprs,
Vec::new(),
)?);

Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new(
let outer_fields = outer_group_exprs
.iter()
.chain(new_aggr_exprs.iter())
.map(|expr| expr.to_field(&inner_schema))
.collect::<Result<Vec<_>>>()?;
let outer_aggr_schema = Arc::new(DFSchema::new_with_metadata(
outer_fields,
input.schema().metadata().clone(),
)?);

// so the aggregates are displayed in the same way even after the rewrite
// this optimizer has two kinds of alias:
// - group_by aggr
// - aggr expr
let group_size = group_expr.len();
let alias_expr = out_group_expr_with_alias
.into_iter()
.map(|(group_expr, original_field)| {
if let Some(name) = original_field {
group_expr.alias(name)
} else {
group_expr
}
})
.chain(new_aggr_exprs.iter().enumerate().map(|(idx, expr)| {
let idx = idx + group_size;
let name = fields[idx].qualified_name();
columnize_expr(expr.clone().alias(name), &outer_aggr_schema)
}))
.collect();

let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(inner_agg),
outer_group_exprs,
outer_aggr_exprs,
new_aggr_exprs,
)?);

Ok(Some(LogicalPlan::Projection(Projection::try_new(
alias_expr,
Arc::new(outer_aggr),
)?)))
} else {
Ok(None)
Expand Down Expand Up @@ -255,9 +299,10 @@ mod tests {
.build()?;

// Should work
let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b)]] [COUNT(DISTINCT test.b):Int64;N]\
\n Aggregate: groupBy=[[test.b AS alias1]], aggr=[[]] [alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Projection: COUNT(alias1) AS COUNT(DISTINCT test.b) [COUNT(DISTINCT test.b):Int64;N]\
\n Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]] [COUNT(alias1):Int64;N]\
\n Aggregate: groupBy=[[test.b AS alias1]], aggr=[[]] [alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(&plan, expected)
}
Expand Down Expand Up @@ -328,9 +373,10 @@ mod tests {
.aggregate(Vec::<Expr>::new(), vec![count_distinct(lit(2) * col("b"))])?
.build()?;

let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT Int32(2) * test.b)]] [COUNT(DISTINCT Int32(2) * test.b):Int64;N]\
\n Aggregate: groupBy=[[Int32(2) * test.b AS alias1]], aggr=[[]] [alias1:Int32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Projection: COUNT(alias1) AS COUNT(DISTINCT Int32(2) * test.b) [COUNT(DISTINCT Int32(2) * test.b):Int64;N]\
\n Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]] [COUNT(alias1):Int64;N]\
\n Aggregate: groupBy=[[Int32(2) * test.b AS alias1]], aggr=[[]] [alias1:Int32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(&plan, expected)
}
Expand All @@ -344,9 +390,10 @@ mod tests {
.build()?;

// Should work
let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\
\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Projection: test.a, COUNT(alias1) AS COUNT(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N]\
\n Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1)]] [a:UInt32, COUNT(alias1):Int64;N]\
\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(&plan, expected)
}
Expand Down Expand Up @@ -389,9 +436,10 @@ mod tests {
)?
.build()?;
// Should work
let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\
\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Projection: test.a, COUNT(alias1) AS COUNT(DISTINCT test.b), MAX(alias1) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):Int64;N, MAX(DISTINCT test.b):UInt32;N]\
\n Aggregate: groupBy=[[test.a]], aggr=[[COUNT(alias1), MAX(alias1)]] [a:UInt32, COUNT(alias1):Int64;N, MAX(alias1):UInt32;N]\
\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(&plan, expected)
}
Expand Down Expand Up @@ -423,9 +471,10 @@ mod tests {
.build()?;

// Should work
let expected = "Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT test.c)]] [group_alias_0:Int32, COUNT(DISTINCT test.c):Int64;N]\
\n Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Projection: group_alias_0 AS test.a + Int32(1), COUNT(alias1) AS COUNT(DISTINCT test.c) [test.a + Int32(1):Int32, COUNT(DISTINCT test.c):Int64;N]\
\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
\n Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(&plan, expected)
}
Expand Down
33 changes: 28 additions & 5 deletions datafusion/sqllogictest/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3823,21 +3823,44 @@ query TT
EXPLAIN SELECT SUM(DISTINCT CAST(x AS DOUBLE)), MAX(DISTINCT CAST(x AS DOUBLE)) FROM t1 GROUP BY y;
----
logical_plan
Projection: SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)
--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x)]]
Projection: SUM(alias1) AS SUM(DISTINCT t1.x), MAX(alias1) AS MAX(DISTINCT t1.x)
--Aggregate: groupBy=[[t1.y]], aggr=[[SUM(alias1), MAX(alias1)]]
----Aggregate: groupBy=[[t1.y, CAST(t1.x AS Float64)t1.x AS t1.x AS alias1]], aggr=[[]]
------Projection: CAST(t1.x AS Float64) AS CAST(t1.x AS Float64)t1.x, t1.y
--------TableScan: t1 projection=[x, y]
physical_plan
ProjectionExec: expr=[SUM(DISTINCT t1.x)@1 as SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)@2 as MAX(DISTINCT t1.x)]
--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX(DISTINCT t1.x)]
--AggregateExec: mode=FinalPartitioned, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)]
----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([y@0], 8), input_partitions=8
--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(DISTINCT t1.x), MAX(DISTINCT t1.x)]
--------AggregateExec: mode=Partial, gby=[y@0 as y], aggr=[SUM(alias1), MAX(alias1)]
----------AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, alias1@1 as alias1], aggr=[]
------------CoalesceBatchesExec: target_batch_size=2
--------------RepartitionExec: partitioning=Hash([y@0, alias1@1], 8), input_partitions=8
----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
------------------AggregateExec: mode=Partial, gby=[y@1 as y, CAST(t1.x AS Float64)t1.x@0 as alias1], aggr=[]
--------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y]
----------------------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
drop table t1

# Reproducer for https://github.com/apache/arrow-datafusion/issues/8175

statement ok
create table t1(state string, city string, min_temp float, area int, time timestamp) as values
('MA', 'Boston', 70.4, 1, 50),
('MA', 'Bedford', 71.59, 2, 150);

query RI
select date_part('year', time) as bla, count(distinct state) as count from t1 group by bla;
----
1970 1

query PI
select date_bin(interval '1 year', time) as bla, count(distinct state) as count from t1 group by bla;
----
1970-01-01T00:00:00 1

statement ok
drop table t1
47 changes: 25 additions & 22 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1361,29 +1361,31 @@ from join_t1
inner join join_t2 on join_t1.t1_id = join_t2.t2_id
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT join_t1.t1_id)]]
--Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
----Projection: join_t1.t1_id
------Inner Join: join_t1.t1_id = join_t2.t2_id
--------TableScan: join_t1 projection=[t1_id]
--------TableScan: join_t2 projection=[t2_id]
Projection: COUNT(alias1) AS COUNT(DISTINCT join_t1.t1_id)
--Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]]
----Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
------Projection: join_t1.t1_id
--------Inner Join: join_t1.t1_id = join_t2.t2_id
----------TableScan: join_t1 projection=[t1_id]
----------TableScan: join_t2 projection=[t2_id]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[COUNT(DISTINCT join_t1.t1_id)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(DISTINCT join_t1.t1_id)]
------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
--------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
----------ProjectionExec: expr=[t1_id@0 as t1_id]
------------CoalesceBatchesExec: target_batch_size=2
--------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)]
----------------CoalesceBatchesExec: target_batch_size=2
------------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
--------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------------MemoryExec: partitions=1, partition_sizes=[1]
----------------CoalesceBatchesExec: target_batch_size=2
------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
--------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------------MemoryExec: partitions=1, partition_sizes=[1]
ProjectionExec: expr=[COUNT(alias1)@0 as COUNT(DISTINCT join_t1.t1_id)]
--AggregateExec: mode=Final, gby=[], aggr=[COUNT(alias1)]
----CoalescePartitionsExec
------AggregateExec: mode=Partial, gby=[], aggr=[COUNT(alias1)]
--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
------------ProjectionExec: expr=[t1_id@0 as t1_id]
--------------CoalesceBatchesExec: target_batch_size=2
----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)]
------------------CoalesceBatchesExec: target_batch_size=2
--------------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[1]
------------------CoalesceBatchesExec: target_batch_size=2
--------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.explain.logical_plan_only = true;
Expand Down Expand Up @@ -3407,3 +3409,4 @@ set datafusion.optimizer.prefer_existing_sort = false;

statement ok
drop table annotated_data;

10 changes: 5 additions & 5 deletions datafusion/sqllogictest/test_files/tpch/q16.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ limit 10;
logical_plan
Limit: skip=0, fetch=10
--Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type ASC NULLS LAST, part.p_size ASC NULLS LAST, fetch=10
----Projection: part.p_brand, part.p_type, part.p_size, COUNT(DISTINCT partsupp.ps_suppkey) AS supplier_cnt
------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]], aggr=[[COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)]]
----Projection: part.p_brand, part.p_type, part.p_size, COUNT(alias1) AS supplier_cnt
------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]], aggr=[[COUNT(alias1)]]
--------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey AS alias1]], aggr=[[]]
----------LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey
------------Projection: partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size
Expand All @@ -69,11 +69,11 @@ physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST], fetch=10
----SortExec: TopK(fetch=10), expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
------ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size, COUNT(DISTINCT partsupp.ps_suppkey)@3 as supplier_cnt]
--------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(DISTINCT partsupp.ps_suppkey)]
------ProjectionExec: expr=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size, COUNT(alias1)@3 as supplier_cnt]
--------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(alias1)]
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2], 4), input_partitions=4
--------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(DISTINCT partsupp.ps_suppkey)]
--------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size], aggr=[COUNT(alias1)]
----------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[]
------------------CoalesceBatchesExec: target_batch_size=8192
--------------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2, alias1@3], 4), input_partitions=4
Expand Down

0 comments on commit abb2ae7

Please sign in to comment.