Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the performance queries with a single distinct aggregate #1315

Merged
merged 4 commits into from
Nov 19, 2021

Conversation

ic4y
Copy link
Contributor

@ic4y ic4y commented Nov 16, 2021

Which issue does this PR close?

resolves #1282
related to #1312

Rationale for this change

Improve the performance of single_distinct_agg

a single distinct aggregation optimization method as follows:

- Aggregation
       GROUP BY (k)
       F1(DISTINCT s0, s1, ...),
       F2(DISTINCT s0, s1, ...),
    - X
into
- Aggregation
         GROUP BY (k)
         F1(x)
         F2(x)
     - Aggregation
            GROUP BY (k, s0, s1, ...)
         - X

I used a test data set of 60 million to test datafunshion before and after using the optimizer. After optimization, the performance has double improvement and the execution time has been reduced from 12 seconds to 6 seconds.
The test results and the logical plan before and after optimization are as follows:

sql : select count(distinct LO_EXTENDEDPRICE) from lineorder_flat;

------------------original---------------------
Display: Projection: #COUNT(DISTINCT lineorder_flat.LO_EXTENDEDPRICE) [COUNT(DISTINCT lineorder_flat.LO_EXTENDEDPRICE):UInt64;N]
  Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT #lineorder_flat.LO_EXTENDEDPRICE)]] [COUNT(DISTINCT lineorder_flat.LO_EXTENDEDPRICE):UInt64;N]
    TableScan: lineorder_flat projection=Some([9]) [LO_EXTENDEDPRICE:Int64]
+-------------------------------------------------+
| COUNT(DISTINCT lineorder_flat.LO_EXTENDEDPRICE) |
+-------------------------------------------------+
| 1040570                                         |
+-------------------------------------------------+
usage millis: 12033

----------------after optimization-------------
Display: Projection: #COUNT(lineorder_flat.LO_EXTENDEDPRICE) [COUNT(lineorder_flat.LO_EXTENDEDPRICE):UInt64;N]
  Aggregate: groupBy=[[]], aggr=[[COUNT(#lineorder_flat.LO_EXTENDEDPRICE)]] [COUNT(lineorder_flat.LO_EXTENDEDPRICE):UInt64;N]
    Aggregate: groupBy=[[#lineorder_flat.LO_EXTENDEDPRICE]], aggr=[[]] [LO_EXTENDEDPRICE:Int64]
      TableScan: lineorder_flat projection=Some([9]) [LO_EXTENDEDPRICE:Int64]
+----------------------------------------+
| COUNT(lineorder_flat.LO_EXTENDEDPRICE) |
+----------------------------------------+
| 1040570                                |
+----------------------------------------+
usage millis: 5817

What changes are included in this PR?

Are there any user-facing changes?

nothing

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Nov 16, 2021
schema: _,
group_expr,
} => {
match is_single_agg(plan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be using if/else.

@Dandandan
Copy link
Contributor

Great idea and results! @ic4y

@Dandandan Dandandan closed this Nov 16, 2021
@Dandandan Dandandan reopened this Nov 16, 2021
@xudong963
Copy link
Member

Thanks for your contribution @ic4y, I'll take a look tomorrow.

Comment on lines 56 to 61
LogicalPlan::Aggregate {
input,
aggr_expr,
schema: _,
group_expr,
} => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LogicalPlan::Aggregate {
input,
aggr_expr,
schema: _,
group_expr,
} => {
LogicalPlan::Aggregate {
input,
aggr_expr,
group_expr,
..
} => {

fn is_single_agg(plan: &LogicalPlan) -> bool {
match plan {
LogicalPlan::Aggregate {
input: _,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, you can also check other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks i fixed it

} => {
match is_single_agg(plan) {
true => {
let mut all_group_args: Vec<Expr> = Vec::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut all_group_args: Vec<Expr> = Vec::new();
let mut all_group_args = Vec::with_capacity(group_expr.len());

// remove distinct and collection args
let mut new_aggr_expr = aggr_expr
.iter()
.map(|aggfunc| match aggfunc {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggfunc is still an Expr, so it's better to have a name with expr not func

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question: if all exprs in aggr_expr are Expr::AggregateFunction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because there is judgment in is_single_distinct_agg()

aggr_expr: Vec::new(),
schema: grouped_schema,
};
let mut expres = group_expr.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: expres ?

Comment on lines 99 to 108
let expr = plan.expressions();
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();

let new_inputs = inputs
.iter()
.map(|plan| optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is redundant from the 113~119 lines, so if we can eliminate duplicate code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks i fixed it

@xudong963
Copy link
Member

BTW, some tests can't pass due to distinct -> group by

@ic4y
Copy link
Contributor Author

ic4y commented Nov 17, 2021

@xudong963 Thank you for reviewing!I prioritize the problem of failing the tests

@alamb alamb changed the title add single_distinct_to_group_by optimizer rule Optimize the performance queries with a single distinct aggregate Nov 17, 2021
@alamb
Copy link
Contributor

alamb commented Nov 17, 2021

Thanks @ic4y -- this looks really cool. I plan to review this PR carefully shortly

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @ic4y ❤️ I went through this carefully, and I think it could be merged. This is a very nice first contribution,. I left a few stylistic comments and it looks like there are some test failures (looks like some output needs to be updated).

Also, I think this transformation is also valid for multiple distinct aggregates (if they share the same argument ). For example

SELECT F1(DISTINCT s), F2(DISTINCT s), k
...  
GROUP BY k 

Rewritten to

SELECT F1(s), F2(s)
FROM (
  SELECT s, k ... GROUP BY s, k
) 
GROUP BY k

Comment on lines 41 to 42
/// </pre>
/// <p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These and <p> tags seem out of place.

I think you could use something like

///   ```text
///       - Aggregation
///            GROUP BY (k)
///            F1(s)
///       - Aggregation
///               GROUP BY (k, s)
///            - X
///   ```

If you wanted to use monospaced fonts to illustrate the transformation

use std::sync::Arc;

/// single distinct to group by optimizer rule
/// - Aggregation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is another way to display this transformation

SELECT F1(DISTINCT s) 
...  
GROUP BY k 

Rewritten to

SELECT F1(s) 
FROM (
  SELECT s, k ... GROUP BY s, k
) 
GROUP BY k

Comment on lines 61 to 62
let mut all_group_args = Vec::with_capacity(group_expr.len());
all_group_args.append(&mut group_expr.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut all_group_args = Vec::with_capacity(group_expr.len());
all_group_args.append(&mut group_expr.clone());
let mut all_group_args = group_expr.clone();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this really matters, but I figured I would point out a slightly shorter way to do the same thing.

.iter()
.map(|agg_expr| match agg_expr {
Expr::AggregateFunction { fun, args, .. } => {
all_group_args.append(&mut args.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here you can do something like this (and avoid a mut):

Suggested change
all_group_args.append(&mut args.clone());
all_group_args.extend(args.iter().cloned());

input: Arc::new(grouped_agg.unwrap()),
aggr_expr: new_aggr_expr,
schema: final_agg_schema.clone(),
group_expr: group_expr.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
group_expr: group_expr.clone(),
group_expr,

group_expr: group_expr.clone(),
};

let mut alias_expr: Vec<Expr> = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might help here to explain the rationale for adding this alias (so the aggregates are displayed in the same way even after the rewrite). It is important but may not be obvious to other readers

}

#[test]
fn distinct_and_common() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@ic4y
Copy link
Contributor Author

ic4y commented Nov 18, 2021

@alamb Thank you for your help.
I updated these and supports multiple distinct aggregates that use the same argument

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great @ic4y -- I started CI and when it passes I'll merge this one in. 🎉

})
.count()
== aggr_expr.len()
&& fields_set.len() == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

)?
.build()?;
// Should work
let expected = "Projection: #test.a AS a, #COUNT(test.b) AS COUNT(DISTINCT test.b), #MAX(test.b) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):UInt64;N, MAX(DISTINCT test.b):UInt32;N]\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@Dandandan Dandandan merged commit a60cdb0 into apache:master Nov 19, 2021
@Dandandan
Copy link
Contributor

Thanks for this great contribution 🎉

@houqp houqp added the performance Make DataFusion faster label Nov 20, 2021
@houqp
Copy link
Member

houqp commented Nov 22, 2021

Very cool optimization, thanks @ic4y !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate performance Make DataFusion faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

better performance improvement by optimizing a single distinct aggregation scene
5 participants