Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop copying LogicalPlan and Exprs in SingleDistinctToGroupBy #10527

136 changes: 70 additions & 66 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::sync::Arc;
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::{qualified_name, Result};
use datafusion_common::{
internal_err, qualified_name, tree_node::Transformed, DataFusionError, Result,
};
use datafusion_expr::builder::project;
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::{
Expand Down Expand Up @@ -64,9 +66,7 @@ impl SingleDistinctToGroupBy {
}

/// Check whether all aggregate exprs are distinct on a single field.
fn is_single_distinct_agg(plan: &LogicalPlan) -> Result<bool> {
match plan {
LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => {
fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result<bool> {
let mut fields_set = HashSet::new();
let mut aggregate_count = 0;
for expr in aggr_expr {
Expand All @@ -85,7 +85,7 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result<bool> {
aggregate_count += 1;
if *distinct {
for e in args {
fields_set.insert(e.canonical_name());
fields_set.insert(e);
}
} else if !matches!(fun, Sum | Min | Max) {
return Ok(false);
Expand All @@ -105,12 +105,9 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result<bool> {
aggregate_count += 1;
if *distinct {
for e in args {
fields_set.insert(e.canonical_name());
fields_set.insert(e);
}
} else if fun.name() != "SUM"
&& fun.name() != "MIN"
&& fun.name() != "MAX"
{
} else if fun.name() != "SUM" && fun.name() != "MIN" && fun.name() != "MAX" {
return Ok(false);
}
} else {
Expand All @@ -119,9 +116,6 @@ fn is_single_distinct_agg(plan: &LogicalPlan) -> Result<bool> {
}
Ok(aggregate_count == aggr_expr.len() && fields_set.len() == 1)
}
_ => Ok(false),
}
}

/// Check if the first expr is [Expr::GroupingSet].
fn contains_grouping_set(expr: &[Expr]) -> bool {
Expand All @@ -131,29 +125,51 @@ fn contains_grouping_set(expr: &[Expr]) -> bool {
impl OptimizerRule for SingleDistinctToGroupBy {
fn try_optimize(
&self,
plan: &LogicalPlan,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called SingleDistinctToGroupBy::rewrite")
}

fn name(&self) -> &str {
"single_distinct_aggregation_to_group_by"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
match plan {
LogicalPlan::Aggregate(Aggregate {
input,
aggr_expr,
schema,
group_expr,
..
}) => {
if is_single_distinct_agg(plan)? && !contains_grouping_set(group_expr) {
}) if is_single_distinct_agg(&aggr_expr)?
&& !contains_grouping_set(&group_expr) =>
{
let group_size = group_expr.len();
// alias all original group_by exprs
let (mut inner_group_exprs, out_group_expr_with_alias): (
Vec<Expr>,
Vec<(Expr, Option<String>)>,
) = group_expr
.iter()
.into_iter()
.enumerate()
.map(|(i, group_expr)| {
if let Expr::Column(_) = group_expr {
// For Column expressions we can use existing expression as is.
(group_expr.clone(), (group_expr.clone(), None))
(group_expr.clone(), (group_expr, None))
} else {
// For complex expression write is as alias, to be able to refer
// if from parent operators successfully.
Expand All @@ -173,10 +189,9 @@ impl OptimizerRule for SingleDistinctToGroupBy {
//
// Second aggregate refers to the `test.a + Int32(1)` expression However, its input do not have `test.a` expression in it.
let alias_str = format!("group_alias_{i}");
let alias_expr = group_expr.clone().alias(&alias_str);
let (qualifier, field) = schema.qualified_field(i);
(
alias_expr,
group_expr.alias(alias_str.clone()),
(
col(alias_str),
Some(qualified_name(qualifier, field.name())),
Expand All @@ -186,42 +201,45 @@ impl OptimizerRule for SingleDistinctToGroupBy {
})
.unzip();

// and they can be referenced by the alias in the outer aggr plan
let outer_group_exprs = out_group_expr_with_alias
.iter()
.map(|(out_group_expr, _)| out_group_expr.clone())
.collect::<Vec<_>>();

// replace the distinct arg with alias
let mut index = 1;
let mut group_fields_set = HashSet::new();
let mut inner_aggr_exprs = vec![];
let outer_aggr_exprs = aggr_expr
.iter()
.into_iter()
.map(|aggr_expr| match aggr_expr {
Expr::AggregateFunction(AggregateFunction {
func_def: AggregateFunctionDefinition::BuiltIn(fun),
args,
mut args,
distinct,
..
}) => {
// is_single_distinct_agg ensure args.len=1
if *distinct
&& group_fields_set.insert(args[0].display_name()?)
{
inner_group_exprs.push(
args[0].clone().alias(SINGLE_DISTINCT_ALIAS),
);
if distinct {
if args.len() != 1 {
return internal_err!("DISTINCT aggregate should have exactly one argument");
}
let arg = args.swap_remove(0);

if group_fields_set.insert(arg.display_name()?) {
inner_group_exprs
.push(arg.alias(SINGLE_DISTINCT_ALIAS));
}
Ok(Expr::AggregateFunction(AggregateFunction::new(
fun,
vec![col(SINGLE_DISTINCT_ALIAS)],
false, // intentional to remove distinct here
None,
None,
None,
)))
// if the aggregate function is not distinct, we need to rewrite it like two phase aggregation
if !(*distinct) {
} else {
index += 1;
let alias_str = format!("alias{}", index);
inner_aggr_exprs.push(
Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
args.clone(),
args,
false,
None,
None,
Expand All @@ -230,40 +248,35 @@ impl OptimizerRule for SingleDistinctToGroupBy {
.alias(&alias_str),
);
Ok(Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
fun,
vec![col(&alias_str)],
false,
None,
None,
None,
)))
} else {
Ok(Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
vec![col(SINGLE_DISTINCT_ALIAS)],
false, // intentional to remove distinct here
None,
None,
None,
)))
}
}
_ => Ok(aggr_expr.clone()),
_ => Ok(aggr_expr),
})
.collect::<Result<Vec<_>>>()?;

// construct the inner AggrPlan
let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new(
input.clone(),
input,
inner_group_exprs,
inner_aggr_exprs,
)?);

let outer_group_exprs = out_group_expr_with_alias
.iter()
.map(|(expr, _)| expr.clone())
.collect();

// so the aggregates are displayed in the same way even after the rewrite
// this optimizer has two kinds of alias:
// - group_by aggr
// - aggr expr
let group_size = group_expr.len();
let alias_expr: Vec<_> = out_group_expr_with_alias
.into_iter()
.map(|(group_expr, original_field)| {
Expand All @@ -273,35 +286,26 @@ impl OptimizerRule for SingleDistinctToGroupBy {
group_expr
}
})
.chain(outer_aggr_exprs.iter().enumerate().map(|(idx, expr)| {
.chain(outer_aggr_exprs.iter().cloned().enumerate().map(
|(idx, expr)| {
let idx = idx + group_size;
let (qualifier, field) = schema.qualified_field(idx);
let name = qualified_name(qualifier, field.name());
expr.clone().alias(name)
}))
expr.alias(name)
},
))
.collect();

let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new(
Arc::new(inner_agg),
outer_group_exprs,
outer_aggr_exprs,
)?);
Ok(Some(project(outer_aggr, alias_expr)?))
} else {
Ok(None)
}
Ok(Transformed::yes(project(outer_aggr, alias_expr)?))
}
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}

fn name(&self) -> &str {
"single_distinct_aggregation_to_group_by"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
}

#[cfg(test)]
Expand Down