-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13749][SQL] Faster pivot implementation for many distinct values with two phase aggregation #11583
Conversation
…ons and at best doesent do much data reduction.
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Test build #52681 has finished for PR 11583 at commit
|
Test build #52682 has finished for PR 11583 at commit
|
@aray Thank you for working on it. The results look very cool! I may not have time to review this PR within this week. I will try to find time next week to take a look. |
@yhuai do you have time this week to look at this patch? |
Sorry. I will review this one this week. |
override lazy val inputAggBufferAttributes: Seq[AttributeReference] = | ||
aggBufferAttributes.map(_.newInstance()) | ||
|
||
override lazy val inputTypes: Seq[AbstractDataType] = children.map(_.dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we use inputTypes
to ask the analyzer to do type casting. So, if there is a value column that has an invalid data type, the analyzer will complain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean by this, but no casting is needed.
* Remove threshold of 10 for pivot values in Analyzer * Change updateFunction into a partial function so support can be checked without try/catch * Scaladoc for PivotFirst * Move children, inputTypes, nullable, and dataType to beginning * Added comments
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Test build #56095 has finished for PR 11583 at commit
|
Test build #56103 has finished for PR 11583 at commit
|
@yhuai I've addressed all your comments, ready for you to take another look. Sorry for the delay. |
@yhuai can we get this merged for 2.0? |
test this please |
|
||
|
||
override lazy val aggBufferAttributes: Seq[AttributeReference] = | ||
pivotIndex.toList.sortBy(_._2).map(kv => AttributeReference(kv._1.toString, valueDataType)()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we avoid of using lazy val for aggBufferAttributes
, aggBufferSchema
, and inputAggBufferAttributes
?
@aray This PR looks good. I will merge this after it passes tests. Can you send out a follow up pr to address my comments? |
Sure, will do tonight. |
Test build #57537 has finished for PR 11583 at commit
|
Merging to master and 2.0 branch. |
…es with two phase aggregation ## What changes were proposed in this pull request? The existing implementation of pivot translates into a single aggregation with one aggregate per distinct pivot value. When the number of distinct pivot values is large (say 1000+) this can get extremely slow since each input value gets evaluated on every aggregate even though it only affects the value of one of them. I'm proposing an alternate strategy for when there are 10+ (somewhat arbitrary threshold) distinct pivot values. We do two phases of aggregation. In the first we group by the grouping columns plus the pivot column and perform the specified aggregations (one or sometimes more). In the second aggregation we group by the grouping columns and use the new (non public) PivotFirst aggregate that rearranges the outputs of the first aggregation into an array indexed by the pivot value. Finally we do a project to extract the array entries into the appropriate output column. ## How was this patch tested? Additional unit tests in DataFramePivotSuite and manual larger scale testing. Author: Andrew Ray <[email protected]> Closes #11583 from aray/fast-pivot. (cherry picked from commit 9927441) Signed-off-by: Yin Huai <[email protected]>
…stinct values with two phase aggregation ## What changes were proposed in this pull request? This is a follow up PR for #11583. It makes 3 lazy vals into just vals and adds unit test coverage. ## How was this patch tested? Existing unit tests and additional unit tests. Author: Andrew Ray <[email protected]> Closes #12861 from aray/fast-pivot-follow-up. (cherry picked from commit d8f528c) Signed-off-by: Yin Huai <[email protected]>
…stinct values with two phase aggregation ## What changes were proposed in this pull request? This is a follow up PR for apache#11583. It makes 3 lazy vals into just vals and adds unit test coverage. ## How was this patch tested? Existing unit tests and additional unit tests. Author: Andrew Ray <[email protected]> Closes apache#12861 from aray/fast-pivot-follow-up.
What changes were proposed in this pull request?
The existing implementation of pivot translates into a single aggregation with one aggregate per distinct pivot value. When the number of distinct pivot values is large (say 1000+) this can get extremely slow since each input value gets evaluated on every aggregate even though it only affects the value of one of them.
I'm proposing an alternate strategy for when there are 10+ (somewhat arbitrary threshold) distinct pivot values. We do two phases of aggregation. In the first we group by the grouping columns plus the pivot column and perform the specified aggregations (one or sometimes more). In the second aggregation we group by the grouping columns and use the new (non public) PivotFirst aggregate that rearranges the outputs of the first aggregation into an array indexed by the pivot value. Finally we do a project to extract the array entries into the appropriate output column.
How was this patch tested?
Additional unit tests in DataFramePivotSuite and manual larger scale testing.