Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34581][SQL] Don't optimize out grouping expressions from aggregate expressions without aggregate function #32396

Conversation

peter-toth
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds a new rule PullOutGroupingExpressions to pull out complex grouping expressions to a Project node under an Aggregate. These expressions are then referenced in both grouping expressions and aggregate expressions without aggregate functions to ensure that optimization rules don't change the aggregate expressions to invalid ones that no longer refer to any grouping expressions.

Why are the changes needed?

If aggregate expressions (without aggregate functions) in an Aggregate node are complex then the Optimizer can optimize out grouping expressions from them and so making aggregate expressions invalid.

Here is a simple example:

SELECT not(t.id IS NULL) , count(*)
FROM t
GROUP BY t.id IS NULL

In this case the BooleanSimplification rule does this:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
!Aggregate [isnull(id#222)], [NOT isnull(id#222) AS (NOT (id IS NULL))#226, count(1) AS c#224L]   Aggregate [isnull(id#222)], [isnotnull(id#222) AS (NOT (id IS NULL))#226, count(1) AS c#224L]
 +- Project [value#219 AS id#222]                                                                 +- Project [value#219 AS id#222]
    +- LocalRelation [value#219]                                                                     +- LocalRelation [value#219]                                          

where NOT isnull(id#222) is optimized to isnotnull(id#222) and so it no longer refers to any grouping expression.

Before this PR:

== Optimized Logical Plan ==
Aggregate [isnull(id#222)], [isnotnull(id#222) AS (NOT (id IS NULL))#234, count(1) AS c#232L]
+- Project [value#219 AS id#222]
   +- LocalRelation [value#219]

and running the query throws an error:

Couldn't find id#222 in [isnull(id#222)#230,count(1)#226L]
java.lang.IllegalStateException: Couldn't find id#222 in [isnull(id#222)#230,count(1)#226L]

After this PR:

== Optimized Logical Plan ==
Aggregate [_groupingexpression#233], [NOT _groupingexpression#233 AS (NOT (id IS NULL))#230, count(1) AS c#228L]
+- Project [isnull(value#219) AS _groupingexpression#233]
   +- LocalRelation [value#219]

and the query works.

Does this PR introduce any user-facing change?

Yes, the query works.

How was this patch tested?

Added new UT.

@peter-toth
Copy link
Contributor Author

@sigmod, @cloud-fan this is the alternative PR to #31913

@@ -405,14 +407,6 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper {
val arrayAggRel = relation.groupBy(
CreateArray(Seq('nullable_id)))(GetArrayItem(CreateArray(Seq('nullable_id)), 0))
checkRule(arrayAggRel, arrayAggRel)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed now. It is optimized to:

Aggregate [*id#0L], [CASE WHEN (0 = *id#0L) THEN (*id#0L + 1) END AS a#0L]
+- LocalRelation <empty>, [*id#0L, nullable_id#0L]

case a: Aggregate if a.resolved =>
val complexGroupingExpressionMap = mutable.LinkedHashMap.empty[Expression, NamedExpression]
val newGroupingExpressions = a.groupingExpressions
.filterNot(AggregateExpression.containsAggregate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed? IIUC the analyzer guarantees that grouping expression can't contain aggregate expressions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are right. I run some experiments to make this rule part of the Analyzer as PullOutNondeterministic is, but realized that it I would require more changes and reverted. I forgot to remove this. Fixed in: ba2f0c7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seem like you only removed it from the following map method, but this filterNot can also be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, thanks. I made some other mistakes too in that commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be ok after bfb85de

val newGroupingExpressions = a.groupingExpressions
.filterNot(AggregateExpression.containsAggregate)
.map {
case e if AggregateExpression.isAggregate(e) => e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ba2f0c7.

@cloud-fan
Copy link
Contributor

The code change looks good. Can we run a TPCDS benchmark to make sure there is no perf regression?

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42601/

@peter-toth
Copy link
Contributor Author

The code change looks good. Can we run a TPCDS benchmark to make sure there is no perf regression?

I will run it and post the results soon.

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42605/

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42605/

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Test build #138081 has finished for PR 32396 at commit 5a6367b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 29, 2021

Test build #138085 has finished for PR 32396 at commit ba2f0c7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42637/

@SparkQA
Copy link

SparkQA commented Apr 30, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42637/

@SparkQA
Copy link

SparkQA commented May 1, 2021

Test build #138116 has finished for PR 32396 at commit bfb85de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if TPCDS result shows no perf regression

Copy link
Contributor

@sigmod sigmod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Peter!
LGTM -- I just have one minor comment.

}

val newAggregateExpressions = a.aggregateExpressions
.map(replaceComplexGroupingExpressions(_).asInstanceOf[NamedExpression])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do a manual tree traversal if we want to stop recursion earlier, e.g. case _ if AggregateExpression.isAggregate(e) => e

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the following one work?

a.transformExpressionsWithPruning(e => !(AggregateExpression.isAggregate(e) || e.fordable)) {
....
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes, this could work with some explicit casting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this would traverse on a.groupingExpressions too which is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this would traverse on a.groupingExpressions too which is not needed.

You're right. I think the following would behave the same as the manual recursion:

a.aggregateExpressions.map(_.transformWithPruning(e => !(AggregateExpression.isAggregate(e) || e.fordable))({
// the first two original case branches can be skipped here.
.....
}).asInstanceOf....)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, it's just my small preference -- it seems neater to use framework functions if it works. Feel free to merge whatever you feel comfortable with.

Copy link
Contributor Author

@peter-toth peter-toth May 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm leaving this PR as it is now.

But tested that peter-toth@ed374fe could work, just I need to cast TreePatternBits to Expression.
Although, I wonder if it would make sense to split plan and expression pruning in the future like this: peter-toth@d817fc7 and so this pruning (and probably there are other similar use cases where we want to stop traversal) became simpler: peter-toth@d817fc7#diff-57201016f79912c165715811d7f7f37e2acbef2ae7b241c3c8a0b928d0052eb5R61

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for exploring this, Peter! I'll think more about such use cases.

@peter-toth
Copy link
Contributor Author

peter-toth commented May 1, 2021

LGTM if TPCDS result shows no perf regression

Running it, will post the results today.

I forgot to make PullOutGroupingExpressions non-excludable. Fixed it in fcc4005

@SparkQA
Copy link

SparkQA commented May 1, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42646/

@SparkQA
Copy link

SparkQA commented May 1, 2021

Test build #138125 has finished for PR 32396 at commit fcc4005.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

TPCDS benchmark on scaleFactor=5 data looks good, no significant change: TPCDSQueryBenchmark-results.txt

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in cfc0495 May 2, 2021
@peter-toth
Copy link
Contributor Author

Thanks all for the review.

@maropu
Copy link
Member

maropu commented May 4, 2021

NOTE: rather, it seems this change's improved TPCDS performance, e.g., 250230 => 228723 in q23a (sf=20). Nice work, @peter-toth

@peter-toth
Copy link
Contributor Author

Thanks @maropu for the extended test.

wangyum added a commit that referenced this pull request May 26, 2023
… expressions from aggregate expressions without aggregate function (#941)

* [SPARK-34581][SQL] Don't optimize out grouping expressions from aggregate expressions without aggregate function

### What changes were proposed in this pull request?
This PR adds a new rule `PullOutGroupingExpressions` to pull out complex grouping expressions to a `Project` node under an `Aggregate`. These expressions are then referenced in both grouping expressions and aggregate expressions without aggregate functions to ensure that optimization rules don't change the aggregate expressions to invalid ones that no longer refer to any grouping expressions.

### Why are the changes needed?
If aggregate expressions (without aggregate functions) in an `Aggregate` node are complex then the `Optimizer` can optimize out grouping expressions from them and so making aggregate expressions invalid.

Here is a simple example:
```
SELECT not(t.id IS NULL) , count(*)
FROM t
GROUP BY t.id IS NULL
```
In this case the `BooleanSimplification` rule does this:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
!Aggregate [isnull(id#222)], [NOT isnull(id#222) AS (NOT (id IS NULL))#226, count(1) AS c#224L]   Aggregate [isnull(id#222)], [isnotnull(id#222) AS (NOT (id IS NULL))#226, count(1) AS c#224L]
 +- Project [value#219 AS id#222]                                                                 +- Project [value#219 AS id#222]
    +- LocalRelation [value#219]                                                                     +- LocalRelation [value#219]
```
where `NOT isnull(id#222)` is optimized to `isnotnull(id#222)` and so it no longer refers to any grouping expression.

Before this PR:
```
== Optimized Logical Plan ==
Aggregate [isnull(id#222)], [isnotnull(id#222) AS (NOT (id IS NULL))#234, count(1) AS c#232L]
+- Project [value#219 AS id#222]
   +- LocalRelation [value#219]
```
and running the query throws an error:
```
Couldn't find id#222 in [isnull(id#222)#230,count(1)#226L]
java.lang.IllegalStateException: Couldn't find id#222 in [isnull(id#222)#230,count(1)#226L]
```

After this PR:
```
== Optimized Logical Plan ==
Aggregate [_groupingexpression#233], [NOT _groupingexpression#233 AS (NOT (id IS NULL))#230, count(1) AS c#228L]
+- Project [isnull(value#219) AS _groupingexpression#233]
   +- LocalRelation [value#219]
```
and the query works.

### Does this PR introduce _any_ user-facing change?
Yes, the query works.

### How was this patch tested?
Added new UT.

Closes #32396 from peter-toth/SPARK-34581-keep-grouping-expressions-2.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

(cherry picked from commit cfc0495)

* [SPARK-34037][SQL] Remove unnecessary upcasting for Avg & Sum which handle by themself internally

### What changes were proposed in this pull request?
The type-coercion for numeric types of average and sum is not necessary at all, as the resultType and sumType can prevent the overflow.

### Why are the changes needed?

rm unnecessary logic which may cause potential performance regressions

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

tpcds tests for plan

Closes #31079 from yaooqinn/SPARK-34037.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>

(cherry picked from commit a235c3b)

Co-authored-by: Peter Toth <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants