Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy #789

Closed
wants to merge 5 commits into from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented May 15, 2014

No description provided.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15022/

@mridulm
Copy link
Contributor

mridulm commented May 16, 2014

As I mentioned in the jira, I don't see value in this change - it is a corner case trying to save about 5 lines of straightforward code while adding to the public api

@sryza
Copy link
Contributor Author

sryza commented May 16, 2014

Do you mind elaborating on why it's a corner case? My understanding is that Kryo is the right choice over Java serialization in 99% of cases, and any time somebody wants to use their own objects with Kryo serialization, they need something like those 8 lines of code. I've had the opportunity to teach a bunch of people how to use Spark over the last few months and how to write those lines of code has come up almost every time.

@mridulm
Copy link
Contributor

mridulm commented May 16, 2014

I have elaborated in the jira, but I will repeat it again for clarity:

This is adding an api for a specific case - it assumes single serialization
type (kryoserializable).
The reduction in boilerplate is not high, neither is it doing so for
something tricky/implementation detail.

Particularly given stress of simplifying our api this is going the opposite
direction without sufficient value to justify it.
On 17-May-2014 4:34 am, "Sandy Ryza" [email protected] wrote:

Do you mind elaborating on why it's a corner case? My understanding is
that Kryo is the right choice over Java serialization in 99% of cases, and
any time somebody wants to use their own objects with Kryo serialization,
they need something like those 8 lines of code. I've had the opportunity to
teach a bunch of people how to use Spark over the last few months and how
to write those lines of code has come up almost every time.


Reply to this email directly or view it on GitHubhttps://github.com//pull/789#issuecomment-43388406
.

@ash211
Copy link
Contributor

ash211 commented May 16, 2014

Alternative to adding to SparkContext's API, how about an API along the lines of:

Seq[Class[Any]] classes = ...
KryoRegistrator.enableKryoWithClasses(sc, classes)

The enableKryoWithClasses call would basically do the same thing as before but as a helper function.

@mateiz
Copy link
Contributor

mateiz commented May 27, 2014

I like the idea of moving this into KryoSerializer or KryoRegistrator instead of SparkConf. Maybe it can be KryoSerializer.configure(conf, classes). In addition, this will need a Java-friendly API too. The easiest way is to make it take Array[Class[_]] though you have to see how that translates into Java. Please add unit tests in both Scala and Java so we can see the API being called there.

@mateiz
Copy link
Contributor

mateiz commented May 27, 2014

BTW one other thing I don't fully like about this is that it's not modular -- you need to have a single list of all your classes in one place, and you can't for example have libraries that register their own classes in sequence. I'd prefer for us to think about this a bit more and maybe come up with an API that enables that. For example, imagine the following:

  • The user has their own classes
  • The user also calls MLlib, which has some classes it wants to register
  • The user also calls GraphX, which has other classes it wants to register

It would be interesting to come up with a way where all these guys register their classes in some order. For now the easiest way is for the user's KryoRegistrator to call KryoRegistrators from these other libraries inside itself.

@sryza
Copy link
Contributor Author

sryza commented May 28, 2014

Putting this in KryoRegistrator/Serializer makes sense to me.

That's a good point about the modularity.

One way to deal with it would be to add a config property that carries classes to register. And to have the default KryoRegistrator register all the classes in this property. So the user/library-facing API could be KryoSerializer.registerClasses(SparkConf, Array[Class[_]]), which would append to this property.

Though would there be a good location for a library to call that API? Alternatively, libraries could expose a getClassesToRegister API. So if I'm using MLLib and GraphX, I call KryoSerializer.configure(conf, Arrays.concatenate(myClasses, MLLibKryo.getClassesToRegister, GraphXKryo.getClassesToRegister)). And each library can include its dependencies in its list.

@mridulm
Copy link
Contributor

mridulm commented May 28, 2014

This is what we do currently.
Each module has its own registrator - and they are invoked in the order in
which they are registered to our uber registerator (since spark supports
only a single registrator) - which is what is specified as kyro
registrators.

On Tue, May 27, 2014 at 11:44 AM, Matei Zaharia [email protected]:

BTW one other thing I don't fully like about this is that it's not modular
-- you need to have a single list of all your classes in one place, and you
can't for example have libraries that register their own classes in
sequence. I'd prefer for us to think about this a bit more and maybe come
up with an API that enables that. For example, imagine the following:

  • The user has their own classes
  • The user also calls MLlib, which has some classes it wants to
    register
  • The user also calls GraphX, which has other classes it wants to
    register

It would be interesting to come up with a way where all these guys
register their classes in some order. For now the easiest way is for the
user's KryoRegistrator to call KryoRegistrators from these other libraries
inside itself.


Reply to this email directly or view it on GitHubhttps://github.com//pull/789#issuecomment-44239090
.

@mateiz
Copy link
Contributor

mateiz commented Jul 29, 2014

@sryza are you still looking at coming up with a more modular version of this? If not, it would be good to close the PR for now.

@sryza
Copy link
Contributor Author

sryza commented Jul 29, 2014

@mateiz I posted a couple ideas and was waiting on feedback. Any thoughts?

@mateiz
Copy link
Contributor

mateiz commented Jul 29, 2014

Ah, I see. In that case I'd prefer something like this:

MLlibUtils.registerKryoClasses(conf)
GraphXUtils.registerKryoClasses(conf)
conf.registerKryoClasses(Array(/* your classes */)

The one tricky bit is if two libraries use the same classes. Maybe we can keep track of which classes are registered somehow, otherwise Kryo might get confused.

@sryza
Copy link
Contributor Author

sryza commented Sep 24, 2014

Updated patch adds the APIs discussed. It relies on a new property spark.kryo.classesToRegister, which registerKryoClasses appends to. The change also enables users to register classes entirely through configuration, by setting this property directly.

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20764/

@andrewor14
Copy link
Contributor

retest this please

* If called multiple times, this will append the classes from all calls together.
*/
def registerKryoClasses(classes: Seq[Class[_ <: Any]]): SparkConf = {
val allClassNames = new LinkedHashSet[String]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the order of registering these classes matter? Here we're not preserving the order set through spark.kryo.classesToRegister, but maybe that's not important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a thread that indicates order is important, at least in the sense that the kryo instance serializing should have the same order as the instance deserializer:
https://groups.google.com/forum/#!topic/kryo-users/E-0_EVi-O1Q

String's hash code is deterministic, so it shouldn't really matter, but I think it doesn't hurt to be conservative.

The LinkedHashSet should preserve order, unless I'm missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah never mind, it's a LinkedHashSet. I always trip over these java.util.LinkedHash* collections and think they're just hash collections that don't maintain ordering.

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have started for PR 789 at commit 22bbeec.

  • This patch merges cleanly.

try {
val reg = conf.getOption("spark.kryo.registrator").map(
Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]).getOrElse(
new DefaultKryoRegistrator(conf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use this style for easier readability

val reg = conf.getOption(...)
  .map(...)
  .getOrElse(...)

@andrewor14
Copy link
Contributor

Hey @sryza this looks good. Creating a custom class is cumbersome if the user is just trying things out, and now it becomes just 1 extra line to do it. Will the new API work in Java? Also, spark.kryo.registrator is still documented in the list of configurations. Maybe we should replace this with an entry for spark.kryo.classesToRegister on this list as well in addition to showing how the user can add them dynamically as you have done.

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have finished for PR 789 at commit 22bbeec.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20814/

@sryza
Copy link
Contributor Author

sryza commented Sep 30, 2014

That's correct (documented this on the conf page). My thought was that we could hit strange interactions, for example if the same class is registered both with a custom registrator and with the classes given through registerKryoClasses. I can add this in if you think having both would be useful.

@mateiz
Copy link
Contributor

mateiz commented Oct 2, 2014

I think it's better to do both and explain that there might be problems. Otherwise users will see this new API and perhaps be surprised that their old registrator is no longer called. Not everyone reads the docs on the new API, so they might never notice, and just get poor performance.

BTW looking at Kryo's docs, it does support multiple register calls on the same class, and it just uses the value from the last one. So it will probably do the right thing here if we call their custom registrator last.

@sryza
Copy link
Contributor Author

sryza commented Oct 2, 2014

Updated patch allows using both at the same time

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have started for PR 789 at commit 1be3fa5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have finished for PR 789 at commit 1be3fa5.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • throw new SparkException("Failed to load class to register with Kryo", e)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21175/

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21182/

@sryza
Copy link
Contributor Author

sryza commented Oct 2, 2014

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have started for PR 789 at commit 2def654.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have finished for PR 789 at commit 2def654.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • throw new SparkException("Failed to load class to register with Kryo", e)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21185/

@mateiz
Copy link
Contributor

mateiz commented Oct 13, 2014

Thanks, this looks good to me now, though there are some merge conflicts. Mind rebasing it?

@SparkQA
Copy link

SparkQA commented Oct 14, 2014

QA tests have started for PR 789 at commit 48b05e9.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 14, 2014

QA tests have finished for PR 789 at commit 48b05e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • throw new SparkException("Failed to load class to register with Kryo", e)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21726/
Test PASSed.

@pwendell
Copy link
Contributor

Thanks Sandy - I've merged this!

@asfgit asfgit closed this in 6bb56fa Oct 22, 2014
cloud-fan pushed a commit that referenced this pull request Jun 21, 2021
…subquery reuse

### What changes were proposed in this pull request?
This PR:
1. Fixes an issue in `ReuseExchange` rule that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate traversals in `ReuseExchange` when the 2nd traversal modifies an exchange that has already been referenced (reused) in the 1st traversal.
   Consider the following query:
   ```
   WITH t AS (
     SELECT df1.id, df2.k
     FROM df1 JOIN df2 ON df1.k = df2.k
     WHERE df2.id < 2
   )
   SELECT * FROM t AS a JOIN t AS b ON a.id = b.id
   ```
   Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
   ```
   == Physical Plan ==
   *(7) SortMergeJoin [id#14L], [id#18L], Inner
   :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#298]
   :     +- *(2) Project [id#14L, k#17L]
   :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
   :           :- *(2) Project [id#14L, k#15L]
   :           :  +- *(2) Filter isnotnull(id#14L)
   :           :     +- *(2) ColumnarToRow
   :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#289]
   :           :                 +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
   :              +- *(1) Project [k#17L]
   :                 +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#184] <== this reuse node points to a non-existing node
   ```
   After this PR:
   ```
   == Physical Plan ==
   *(7) SortMergeJoin [id#14L], [id#18L], Inner
   :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#231]
   :     +- *(2) Project [id#14L, k#17L]
   :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
   :           :- *(2) Project [id#14L, k#15L]
   :           :  +- *(2) Filter isnotnull(id#14L)
   :           :     +- *(2) ColumnarToRow
   :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#103]
   :           :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
   :           :                    +- *(1) Project [k#17L]
   :           :                       +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
   :           :                          +- *(1) ColumnarToRow
   :           :                             +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :           +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
   +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#231]
   ```
2. Fixes an issue with separate consecutive `ReuseExchange` and `ReuseSubquery` rules that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate rules when `ReuseSubquery` rule modifies an exchange that has already been referenced (reused) in `ReuseExchange` rule.
   Consider the following query:
   ```
   WITH t AS (
     SELECT df1.id, df2.k
     FROM df1 JOIN df2 ON df1.k = df2.k
     WHERE df2.id < 2
   ),
   t2 AS (
     SELECT * FROM t
     UNION
     SELECT * FROM t
   )
   SELECT * FROM t2 AS a JOIN t2 AS b ON a.id = b.id
   ```
   Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#979]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
   :                 :     :                 +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :        +- *(1) Project [k#49L]
   :                 :           +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
   :                 :              +- *(1) ColumnarToRow
   :                 :                 +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
   :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#761] <== this reuse node points to a non-existing node
   ```
   After this PR:
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#793]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#789]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
   :                 :     :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   :                 :     :                    +- *(1) Project [k#49L]
   :                 :     :                       +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
   :                 :     :                          +- *(1) ColumnarToRow
   :                 :     :                             +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 :     +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
   :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#793]
   ```
   (This example contains issue 1 as well.)

3. Improves the reuse of exchanges and subqueries by enabling reuse across the whole plan. This means that the new combined rule utilizes the reuse opportunities between parent and subqueries by traversing the whole plan. The traversal is started on the top level query only.

4. Due to the order of traversal this PR does while adding reuse nodes, the reuse nodes appear in parent queries if reuse is possible between different levels of queries (typical for DPP). This is not an issue from execution perspective, but this also means "forward references" in explain formatted output where parent queries come first. The changes I made to `ExplainUtils` are to handle these references properly.

This PR fixes the above 3 issues by unifying the separate rules into a `ReuseExchangeAndSubquery` rule that does a 1 pass, whole-plan, bottom-up traversal.

### Why are the changes needed?
Performance improvement.

### How was this patch tested?
- New UTs in `ReuseExchangeAndSubquerySuite` to cover 1. and 2.
- New UTs in `DynamicPartitionPruningSuite`, `SubquerySuite` and `ExchangeSuite` to cover 3.
- New `ReuseMapSuite` to test `ReuseMap`.
- Checked new golden files of `PlanStabilitySuite`s for invalid reuse references.
- TPCDS benchmarks.

Closes #28885 from peter-toth/SPARK-29375-SPARK-28940-whole-plan-reuse.

Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Agirish pushed a commit to HPEEzmeral/apache-spark that referenced this pull request May 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants