-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38034][SQL] Optimize TransposeWindow rule #35334
[SPARK-38034][SQL] Optimize TransposeWindow rule #35334
Conversation
Thanks for correcting the title @HyukjinKwon and thanks for reviewing it @tanelk, should we find more people to help review it? |
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala
Outdated
Show resolved
Hide resolved
cc @hvanhovell FYI |
Gentle ping @hvanhovell :) |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
@cloud-fan, @hvanhovell, I think this is a pretty good optimization. We have a customer who upgraded from Spark 2 to Spark 3 and due to the new |
@@ -1148,9 +1148,9 @@ object CollapseWindow extends Rule[LogicalPlan] { | |||
*/ | |||
object TransposeWindow extends Rule[LogicalPlan] { | |||
private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = { | |||
ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd consider this as a perf bug...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean we should only optimize the performance without changing the logic?Maybe we could find a way to avoid the permutation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the previous code is kind of buggy. This PR is more like a bug fix instead of perf improvement and we should backport it.
@constzhou sorry for the late review. Can you rebase this PR to retrigger the tests? |
75bc299
to
0d8e472
Compare
It's OK :) triggered now |
I think the previous O(n!) time complexity code is unexpected and buggy. Let's backport this PR. |
### What changes were proposed in this pull request? Optimize the TransposeWindow rule to extend applicable cases and optimize time complexity. TransposeWindow rule will try to eliminate unnecessary shuffle: but the function compatiblePartitions will only take the first n elements of the window2 partition sequence, for some cases, this will not take effect, like the case below: val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d") df.selectExpr( "sum(`d`) OVER(PARTITION BY `b`,`a`) as e", "sum(`c`) OVER(PARTITION BY `a`) as f" ).explain Current plan == Physical Plan == *(5) Project [e#10L, f#11L] +- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L] +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#2L, 200), true, [id=#41] +- *(3) Project [a#2L, c#4L, e#10L] +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L] +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33] +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L] +- *(1) Range (0, 10, step=1, splits=10) Expected plan: == Physical Plan == *(4) Project [e#924L, f#925L] +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L] +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0 +- *(3) Project [d#43L, b#41L, a#40L, f#925L] +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L] +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#40L, 200), true, [id=#282] +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L] +- *(1) Range (0, 10, step=1, splits=10) Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it. ### Why are the changes needed? We could apply the rule for more cases, which could improve the execution performance by eliminate unnecessary shuffle, and by reducing the time complexity from O(n!) to O(n2), the performance for the rule itself could improve ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UT Closes #35334 from constzhou/SPARK-38034_optimize_transpose_window_rule. Authored-by: xzhou <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 0cc331d) Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Optimize the TransposeWindow rule to extend applicable cases and optimize time complexity. TransposeWindow rule will try to eliminate unnecessary shuffle: but the function compatiblePartitions will only take the first n elements of the window2 partition sequence, for some cases, this will not take effect, like the case below: val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d") df.selectExpr( "sum(`d`) OVER(PARTITION BY `b`,`a`) as e", "sum(`c`) OVER(PARTITION BY `a`) as f" ).explain Current plan == Physical Plan == *(5) Project [e#10L, f#11L] +- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L] +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#2L, 200), true, [id=#41] +- *(3) Project [a#2L, c#4L, e#10L] +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L] +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33] +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L] +- *(1) Range (0, 10, step=1, splits=10) Expected plan: == Physical Plan == *(4) Project [e#924L, f#925L] +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L] +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0 +- *(3) Project [d#43L, b#41L, a#40L, f#925L] +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L] +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#40L, 200), true, [id=#282] +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L] +- *(1) Range (0, 10, step=1, splits=10) Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it. ### Why are the changes needed? We could apply the rule for more cases, which could improve the execution performance by eliminate unnecessary shuffle, and by reducing the time complexity from O(n!) to O(n2), the performance for the rule itself could improve ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UT Closes #35334 from constzhou/SPARK-38034_optimize_transpose_window_rule. Authored-by: xzhou <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 0cc331d) Signed-off-by: Wenchen Fan <[email protected]>
thanks, merging to master/3.3/3.2! (3.1 has conflicts and I didn't backport) |
Can one of the admins verify this patch? |
### What changes were proposed in this pull request? Optimize the TransposeWindow rule to extend applicable cases and optimize time complexity. TransposeWindow rule will try to eliminate unnecessary shuffle: but the function compatiblePartitions will only take the first n elements of the window2 partition sequence, for some cases, this will not take effect, like the case below: val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d") df.selectExpr( "sum(`d`) OVER(PARTITION BY `b`,`a`) as e", "sum(`c`) OVER(PARTITION BY `a`) as f" ).explain Current plan == Physical Plan == *(5) Project [e#10L, f#11L] +- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L] +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#2L, 200), true, [id=apache#41] +- *(3) Project [a#2L, c#4L, e#10L] +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L] +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=apache#33] +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L] +- *(1) Range (0, 10, step=1, splits=10) Expected plan: == Physical Plan == *(4) Project [e#924L, f#925L] +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L] +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0 +- *(3) Project [d#43L, b#41L, a#40L, f#925L] +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L] +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#40L, 200), true, [id=apache#282] +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L] +- *(1) Range (0, 10, step=1, splits=10) Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it. ### Why are the changes needed? We could apply the rule for more cases, which could improve the execution performance by eliminate unnecessary shuffle, and by reducing the time complexity from O(n!) to O(n2), the performance for the rule itself could improve ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UT Closes apache#35334 from constzhou/SPARK-38034_optimize_transpose_window_rule. Authored-by: xzhou <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 0cc331d) Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 0f609ff)
What changes were proposed in this pull request?
Optimize the TransposeWindow rule to extend applicable cases and optimize time complexity.
TransposeWindow rule will try to eliminate unnecessary shuffle:
but the function compatiblePartitions will only take the first n elements of the window2 partition sequence, for some cases, this will not take effect, like the case below:
val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d")
df.selectExpr(
"sum(
d
) OVER(PARTITION BYb
,a
) as e","sum(
c
) OVER(PARTITION BYa
) as f").explain
Current plan
== Physical Plan ==
*(5) Project [e#10L, f#11L]
+- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L]
+- *(4) Sort [a#2L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(a#2L, 200), true, [id=#41]
+- *(3) Project [a#2L, c#4L, e#10L]
+- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L]
+- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33]
+- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L]
+- *(1) Range (0, 10, step=1, splits=10)
Expected plan:
== Physical Plan ==
*(4) Project [e#924L, f#925L]
+- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L]
+- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0
+- *(3) Project [d#43L, b#41L, a#40L, f#925L]
+- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L]
+- *(2) Sort [a#40L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(a#40L, 200), true, [id=#282]
+- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L]
+- *(1) Range (0, 10, step=1, splits=10)
Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it.
Why are the changes needed?
We could apply the rule for more cases, which could improve the execution performance by eliminate unnecessary shuffle, and by reducing the time complexity from O(n!) to O(n2), the performance for the rule itself could improve
Does this PR introduce any user-facing change?
no
How was this patch tested?
UT