-
Notifications
You must be signed in to change notification settings - Fork 234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve GpuExpand
by pre-projecting some columns[databricks]
#10247
Conversation
Some rules in Spark will put non-leaf expressions into Expand projections, then it can not leverage the GPU tiered projection across the projection lists. So this PR tries to factor out these expressions and evaluate them before expanding to avoid duplicate evaluation for semantic-equal (sub) expressions. --------- Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
@@ -1079,6 +1079,17 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val ENABLE_EXPAND_PREPROJECT = conf("spark.rapids.sql.expandPreproject.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introducing an internal configuration will be some extra user burden. Could we have a check util method to figure out the existence of common expression. If we do have a common subexpression, we will intro a pre-project ahead of Expand node. Otherwise, we do nothing there.
It seems we can do this via a dry-run of tiered project. If duplicated ref exists in seq[seq[expression]]
, we will introduce a pre-project node to evaluate that duplicated refs.
spark-rapids/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Lines 492 to 507 in 5d08aec
/** | |
* Do projections in a tiered fashion, where earlier tiers contain sub-expressions that are | |
* referenced in later tiers. Each tier adds columns to the original batch corresponding | |
* to the output of the sub-expressions. It also removes columns that are no longer needed, | |
* based on inputAttrTiers for the current tier and the next tier. | |
* Example of how this is processed: | |
* Original projection expressions: | |
* (((a + b) + c) * e), (((a + b) + d) * f), (a + e), (c + f) | |
* Input columns for tier 1: a, b, c, d, e, f (original projection inputs) | |
* Tier 1: (a + b) as ref1 | |
* Input columns for tier 2: a, c, d, e, f, ref1 | |
* Tier 2: (ref1 + c) as ref2, (ref1 + d) as ref3 | |
* Input columns for tier 3: a, c, e, f, ref2, ref3 | |
* Tier 3: (ref2 * e), (ref3 * f), (a + e), (c + f) | |
*/ | |
case class GpuTieredProject(exprTiers: Seq[Seq[GpuExpression]]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change the default value to true
to eliminate user's extra burden. But I do not want to do this until we run enough tests to prove this will not lead to any regressions, especially for high GPU memory pressure cases.
figure out the existence of common expression. If we do have a common subexpression, we will intro a pre-project ahead of Expand node. Otherwise, we do nothing there.
I guess this check boundPreprojections.exprTiers.size > 1
will do as what you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me it all comes down to heuristics and our confidence in them.
If we don't have high confidence in the feature, then we should have a config so a customer can disable it if they run into problems, but there should be a follow on issue to remove the config once we feel confident in it. If we have really high confidence in the solution, then we don't need any kind of config. If we have such low confidence in it that we don't want it on yet, then it is probably not ready to be checked in. If this is for a feature that is still a WIP, then it is fine. We will enable it when the feature is done. If it is a small feature like this it is not OK to have it off by default.
A runtime heuristic that deicides when and/how to apply this feature is separate from the config, because the config is there to help us after the feature has been released and the heuristic is a part of the feature itself. From a computation standpoint I don't see how this would ever be worse than what we have today. But I can envision cases where the total memory usage might be much higher with this feature than without it. But I don't know how realistic those cases are in practice. Like if we have lots and lots of distinct operations on computed columns, with only a very small amount of overlap between them.
select COUNT(distinct a + b) as ab, COUNT(distinct a + b + c) as abc, COUNT(distinct CAST(a as STRING)), COUNT(distinct CAST(b as STRING)), ...
I am happy to see that literal values are not materialized until later, also from what I can tell the initial project throws away any columns that are not going to be materialized in the final output.
There are a few more things that we might be able to do to reduce the memory, but I think it ends up with us guessing at the size of various outputs and then doing some form of a bin packing problem to reduce the amount of memory used.
build |
@@ -1079,6 +1079,17 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val ENABLE_EXPAND_PREPROJECT = conf("spark.rapids.sql.expandPreproject.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me it all comes down to heuristics and our confidence in them.
If we don't have high confidence in the feature, then we should have a config so a customer can disable it if they run into problems, but there should be a follow on issue to remove the config once we feel confident in it. If we have really high confidence in the solution, then we don't need any kind of config. If we have such low confidence in it that we don't want it on yet, then it is probably not ready to be checked in. If this is for a feature that is still a WIP, then it is fine. We will enable it when the feature is done. If it is a small feature like this it is not OK to have it off by default.
A runtime heuristic that deicides when and/how to apply this feature is separate from the config, because the config is there to help us after the feature has been released and the heuristic is a part of the feature itself. From a computation standpoint I don't see how this would ever be worse than what we have today. But I can envision cases where the total memory usage might be much higher with this feature than without it. But I don't know how realistic those cases are in practice. Like if we have lots and lots of distinct operations on computed columns, with only a very small amount of overlap between them.
select COUNT(distinct a + b) as ab, COUNT(distinct a + b + c) as abc, COUNT(distinct CAST(a as STRING)), COUNT(distinct CAST(b as STRING)), ...
I am happy to see that literal values are not materialized until later, also from what I can tell the initial project throws away any columns that are not going to be materialized in the final output.
There are a few more things that we might be able to do to reduce the memory, but I think it ends up with us guessing at the size of various outputs and then doing some form of a bin packing problem to reduce the amount of memory used.
s"enable this.") | ||
.internal() | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please enable this by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
|
||
assert_gpu_and_cpu_are_equal_sql(get_df, | ||
"pre_pro", | ||
"select count(distinct (a+b)), count(distinct if((a+b)>100, c, null)) from pre_pro group by a", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offline sync'ed with @revans2 , it would be great to have some test coverage around (cube and rollup) besides count distinct
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
build |
Signed-off-by: Firestarman <[email protected]>
build |
LGTM. Just need to confirm whether this can improve performance in our targeted workload. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved pending performance checks
I ran some performance tests myself using a set of queries that represent a specific customer that was seeing slowness. Even though it is not a perfect representation I saw an improvement with this patch from a median time of 10.867s to 7.005 seconds or about 50% faster than it is today. I am going to merge this in. I also rebuilt it after up-merging and it looks good. |
Thx a lot for perf tests |
closes #10249
Some rules (e.g. RewriteDistinctAggregates) in Spark will put non-leaf expressions into Expand projections, then it can not leverage the GPU tiered projection across the projection lists.
So this PR tries to factor out these expressions and evaluate them before expanding to avoid duplicate evaluations of semantic-equal (sub) expressions.
e.g. given projections:
without pre-projection,
a+b
will be evaluated twice. While with pre-projection, it hasBy leveraging the tiered projection on the preprojection list,
a+b
will be evaluated only once.