Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17556][SQL] Executor side broadcast for broadcast joins #15178

Closed
wants to merge 12 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Sep 21, 2016

What changes were proposed in this pull request?

The mechanism of broadcast in Spark is to collect the result of an RDD and then broadcast it. This introduces some extra latency. We can broadcast the RDD directly from executors. This patch implements broadcast from executors, and applies it on broadcast join of Spark SQL.

The advantages of executor-size broadcast:

  • The data of RDD doesn't need to collect to the driver before broadcasting
  • The driver isn't the bottleneck of data transmission at the beginning of broadcasting

Design document: https://issues.apache.org/jira/secure/attachment/12831201/executor-side-broadcast.pdf

Major API changes

  • New API broadcastRDDOnExecutor in SparkContext

    It takes two parameters rdd: RDD[T] and mode: BroadcastMode[T]. It will broadcast the content of the rdd between executors without collecting it back to the driver. mode is used to convert the content of the rdd to the broadcasted object.

    Besides T, this API has another type parameter U, which is the type of the converted object.

  • New Broadcast implementation TorrentExecutorBroadcast

    Different to TorrentBroadcast, this implementation doesn't divide and store object data waiting to broadcast in the driver. The executors use local and remote fetches to fetch the blocks of the RDD and convert the rdd content to broadcasted object.

  • BroadcastMode is moved from org.apache.spark.sql.catalyst.plans.physical to org.apache.spark.broadcast

    It is added a type parameter T now which is the converted type of the broadcasted object on executors.

Usage: How to use executor side broadcast

To broadcast the result of a RDD, instead of collecting the result back to the driver and broadcasting it, we can use executor side broadcast feature proposed in this proposal.

  1. Prepare the RDD to be broadcast

    // To broadcast the RDD on executors,
    // we should materialize and cache the result of the RDD
    val rdd = sc.parallelize(1 to 4, 2).cache()
    rdd.count()
    
  2. Define how to transform the result of the RDD with BroadcastMode

    val mode = new BroadcastMode[Int] {
      override def transform(rows: Array[Int]): Array[Int] = rows
    }
    
  3. Broadcast the RDD and use broadcasted variable

    val broadcastedVal = sc.broadcastRDDOnExecutor[Int, Array[Int]](rdd, mode)
    val collected = sc.parallelize(1 to 2, 2).map { _ =>
      broadcastedVal.value.reduce(_ + _) // 1 + 2 + 3 + 4 = 10
    }.collect()
    assert(collected.sum == 20)
    

How was this patch tested?

Jenkins tests.

@SparkQA
Copy link

SparkQA commented Sep 21, 2016

Test build #65709 has finished for PR 15178 at commit 57987d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait BroadcastMode[T]
    • case class BroadcastDistribution(mode: BroadcastMode[InternalRow]) extends Distribution
    • case class BroadcastPartitioning(mode: BroadcastMode[InternalRow]) extends Partitioning
    • case class BroadcastExchangeExec[T: ClassTag](

@viirya
Copy link
Member Author

viirya commented Sep 23, 2016

cc @rxin Can you help review this? Thanks.

@viirya viirya changed the title [SPARK-17556] Executor side broadcast for broadcast joins [SPARK-17556][SQL] Executor side broadcast for broadcast joins Sep 26, 2016
@holdenk
Copy link
Contributor

holdenk commented Sep 28, 2016

This is really interesting and might have some interesting improvements for online ML training in structured streaming as well :) I've got a few questions around unpersist behaviour but I'll dig into this PR more next week. Hopefully @rxin or @JoshRosen can also take a look :)

* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*/
def broadcastRDDOnExecutor[T: ClassTag, U: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to mark this as a developer API for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

* Remove all persisted state associated with this Torrent broadcast on the executors.
*/
override protected def doUnpersist(blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
Copy link
Contributor

@holdenk holdenk Sep 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to as above - does this do what we want?

val data = b.data.asInstanceOf[Iterator[T]].toArray
// We found the block from remote executors' BlockManager, so put the block
// in this executor's BlockManager.
if (!bm.putIterator(pieceId, data.toIterator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So were storing an RDD pieceId here, but I think in unpersist only things with BroadcastBlockId and the correct ID will be removed. Maybe it would be good to add a test around unpersistance to verify its behaving as expected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For RDD, there is a cleaning mechanism that the persisted pieces will be removed once the RDD is not referred. Because we fetch and use RDD pieces here instead of broadcast pieces in driver side broadcast, I think it should be fine to deliver the cleaning to current mechanism.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One solution might be to store the fetched RDD pieces with broadcast piece ID, so in unpersist we can remove all the fetched pieces. However, then we must consider fetch both RDD piece IDs broadcast IDs from other executors under the BitTorrent-like approach. Thus I would prefer the above way and let current cleaning mechanism do its work.

@holdenk
Copy link
Contributor

holdenk commented Sep 29, 2016

Actually another (related) question is what happens when the RDD backing the broadcast unpersists? I think it would be good to have tests around this as well.

@viirya
Copy link
Member Author

viirya commented Sep 29, 2016

@holdenk If the RDD pieces are fetched done on the executors, it will not affect the broadcasted object. If the fetching is not done, reading blocks will be failed. Let me think if I can add a test for it.

@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66101 has finished for PR 15178 at commit f50cf31.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor

holdenk commented Sep 29, 2016

@viirya so what I mean is right now the I think executors will fetch the blocks and they might not get cleaned up once the broadcast is destroyed. You could add a test to see if the blocks are everywhere after unpersist. The other question is if someone broadcasts a cached RDD and then unpersists it, I'm worried it might clean up the broadcast blocks on the executor as well. You could add a test here to see if you can use the broadcast after unpersist of the backing RDD (or if we don't want to support that use case add a note about it to the docs and make sure it fails in a clear manner).

@viirya
Copy link
Member Author

viirya commented Sep 30, 2016

@holdenk For the first one question, as this executor side broadcast directly uses RDD blocks instead of creating broadcast blocks, once the broadcast is destroyed, only the broadcasted object is cleaned. I will add a test for this.

For the second question, it depends on if the broadcasted object is created on the executors or not. If yes, it wouldn't affect it. If no, there will be failure when the executors trying to fetch the RDD blocks and create the object.

@viirya
Copy link
Member Author

viirya commented Sep 30, 2016

@holdenk I added the test cases.

@viirya viirya force-pushed the broadcast-on-executors branch from 1339daf to 3494728 Compare September 30, 2016 07:23
@SparkQA
Copy link

SparkQA commented Sep 30, 2016

Test build #66162 has finished for PR 15178 at commit 1339daf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait BroadcastMode[T] extends Serializable

@viirya
Copy link
Member Author

viirya commented Sep 30, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 30, 2016

Test build #66163 has finished for PR 15178 at commit 3494728.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait BroadcastMode[T] extends Serializable

@SparkQA
Copy link

SparkQA commented Sep 30, 2016

Test build #66165 has finished for PR 15178 at commit 3494728.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait BroadcastMode[T] extends Serializable

@holdenk
Copy link
Contributor

holdenk commented Sep 30, 2016

So if the primary use of this is inside of SQL then that might be OK (because we can just be very careful about it) - but since we are also exposing it to the user it feels like that these behaviour will probably catch some people by surprise (and at the very least we should document the behaviour). Maybe it would make sense to update the cleaning logic somehow or store the blocks differently so the currently cleaning logic behaves as expected - but it would be really good to hear what @rxin or @JoshRosen think about this because I'm a little uncertain.

@viirya
Copy link
Member Author

viirya commented Sep 30, 2016

@holdenk After rethinking this, I have an idea that we can avoid this surprise. Let me refactor this and please review this then. Thanks!

@holdenk
Copy link
Contributor

holdenk commented Sep 30, 2016

Awesome - so excited to see this :)

@viirya
Copy link
Member Author

viirya commented Oct 1, 2016

@holdenk This is updated.

Now we only require the RDD to be executor side broadcast needs to be cached and materialized first. Broadcast blocks are created immediately on the executors. When you get the broadcast variable, you can unpersist the RDD and the broadcast variable is ok to be used.

@SparkQA
Copy link

SparkQA commented Oct 1, 2016

Test build #66196 has finished for PR 15178 at commit 17b4470.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class RowBroadcastMode extends BroadcastMode[InternalRow]
    • case class BroadcastDistribution(mode: RowBroadcastMode) extends Distribution
    • case class BroadcastPartitioning(mode: RowBroadcastMode) extends Partitioning

@SparkQA
Copy link

SparkQA commented Oct 1, 2016

Test build #66199 has finished for PR 15178 at commit 0440cc7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 1, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 1, 2016

Test build #66204 has finished for PR 15178 at commit 0440cc7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 4, 2017

Test build #70877 has finished for PR 15178 at commit 1b499d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 8, 2017

ping @rxin Do you have any more thoughts or feedback for this? Thanks.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77963 has finished for PR 15178 at commit 34a49d5.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77967 has finished for PR 15178 at commit 4b31c7e.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya force-pushed the broadcast-on-executors branch from 4b31c7e to 59d96d7 Compare June 13, 2017 06:14
@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77968 has finished for PR 15178 at commit 59d96d7.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya force-pushed the broadcast-on-executors branch from 59d96d7 to ecebb3f Compare June 13, 2017 06:37
@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77972 has started for PR 15178 at commit ecebb3f.

@viirya
Copy link
Member Author

viirya commented Jun 13, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77979 has finished for PR 15178 at commit ecebb3f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77992 has finished for PR 15178 at commit ef987ae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jun 13, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Jun 14, 2017

Test build #78011 has finished for PR 15178 at commit ef987ae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 11, 2017

@rxin Do we still consider to incorporate this broadcast on executor feature? Thanks.

@SparkQA
Copy link

SparkQA commented Sep 11, 2017

Test build #81636 has finished for PR 15178 at commit 93ecabb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Oct 31, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 31, 2017

Test build #83255 has finished for PR 15178 at commit a1f2faa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

The current solution could OOM executors whose memory sizes are normally much smaller than the driver. We might also see the performance regression when the number of partitions is large and the partition size is small.

How about closing this PR now and we can revisit it when we need this feature?

@viirya
Copy link
Member Author

viirya commented Nov 1, 2017

Well, it can be discussed for the many aspects of this feature. I agree we can close this for now because this is open for a while and seems no urgent need for it.

@viirya viirya closed this Nov 1, 2017
if (dataSize >= (8L << 30)) {
// Call persist on the RDD because we want to broadcast the RDD blocks on executors.
childRDD = child.execute().mapPartitionsInternal { rowIterator =>
rowIterator.map(_.copy())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the PR has been closed, but I was interested in understanding the code.
Why is copy of child RDD made before persisting ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SparkSQL, the underlying mechanism reuses single object (the row here) when iterating. We must manually copy the row in map before persisting. Otherwise you will get the results with all same rows.

@amoghmargoor
Copy link

amoghmargoor commented Apr 11, 2019

@viirya Thanks for this diff.
We found one issue here, which I wanted to point out just in case somebody wanted to use this patch.
There are references to broadcast.value in BroadcastHashJoinExec which gets executed on Driver. That might bring the RDD values being broadcasted to Driver's block manager too. That happens due to code generation flow. To fix it, we took the shortcut and avoided using one hash join optimization in code gen for cases where keys in build side are unique. Not sure if we can come up with solution where we need not have to sacrifice upon that.

@viirya viirya deleted the broadcast-on-executors branch December 27, 2023 18:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants