Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3386] Share and reuse SerializerInstances in shuffle paths #5606

Closed
wants to merge 3 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch modifies several shuffle-related code paths to share and re-use SerializerInstances instead of creating new ones. Some serializers, such as KryoSerializer or SqlSerializer, can be fairly expensive to create or may consume moderate amounts of memory, so it's probably best to avoid unnecessary serializer creation in hot code paths.

The key change in this patch is modifying getDiskWriter() / DiskBlockObjectWriter to accept SerializerInstances instead of Serializers (which are factories for instances). This allows the disk writer's creator to decide whether the serializer instance can be shared or re-used.

The rest of the patch modifies several write and read paths to use shared serializers. One big win is in ShuffleBlockFetcherIterator, where we used to create a new serializer per received block. Similarly, the shuffle write path used to create a new serializer per file even though in many cases only a single thread would be writing to a file at a time.

I made a small serializer reuse optimization in CoarseGrainedExecutorBackend as well, since it seemed like a small and obvious improvement.

@JoshRosen JoshRosen changed the title [SPARK-3386] Share and reuse SerializerInstances in shuffle code paths [SPARK-3386] Share and reuse SerializerInstances in shuffle paths Apr 21, 2015
@@ -133,7 +134,8 @@ class FileShuffleBlockManager(conf: SparkConf)
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this line is called once for every bucket (reduce task), since it's enclosed in

Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
...

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30653 has started for PR 5606 at commit aeb680e.

@JoshRosen
Copy link
Contributor Author

This made a large performance difference in a local-mode SparkSQL test that I was running today, cutting one of my query's times from 30 seconds to 9 seconds. The benefit may be smaller for non-SQL jobs, since they're less likely to use costly-to-construct serializers.

It would be good to verify that I haven't made any bad thread-safety / single-threadedness assumptions here.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30654 has started for PR 5606 at commit 64f8398.

@@ -47,6 +48,11 @@ private[spark] class CoarseGrainedExecutorBackend(
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

// This is a thread-local in case we ever decide to change this to a non-thread-safe RpcEndpoint
private[this] val ser: ThreadLocal[SerializerInstance] = new ThreadLocal[SerializerInstance] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually now i think about it -- i think if the underlying thread pool keeps creating new threads (which it might), this thread local variable might lead to memory leak. maybe the best way to handle this is with your old one, and add a comment to the beginning of the class saying if we ever change it to support multiple threads, change here as well.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30653 has finished for PR 5606 at commit aeb680e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30653/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30654 has finished for PR 5606 at commit 64f8398.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30654/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30681 has started for PR 5606 at commit f661ce7.

@JoshRosen
Copy link
Contributor Author

As an example of the speedup that this can give, here's a simple benchmark.

Launch spark-shell with the following options:

./bin/spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --master=local[8]

Then, past the following commands into the shell:

val start = System.currentTimeMillis()
sc.parallelize(1 to 10000, 100).map(x => (x, x)).reduceByKey(_ + _, 100).count()
println(System.currentTimeMillis() - start)

Prior to this patch, this takes about 2.5 seconds to run (after a few warmup runs); after this patch, this same query takes around 600ms (after warmup).

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30681 has finished for PR 5606 at commit f661ce7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30681/
Test FAILed.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30686 has started for PR 5606 at commit f661ce7.

@SparkQA
Copy link

SparkQA commented Apr 21, 2015

Test build #30686 has finished for PR 5606 at commit f661ce7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30686/
Test PASSed.

@rxin
Copy link
Contributor

rxin commented Apr 21, 2015

LGTM.

@carrino
Copy link

carrino commented May 20, 2015

Hi. I'm a little late to this party, but I just started using spark 2 weeks ago.

My concern with this fix is that it shares serializers, but it doesn't call reset on the Kryo Serializer between files. If reference tracking is turned on this could lead to issues where a later ref may be missing because it's in a different file.

@JoshRosen
Copy link
Contributor Author

Hi @carrino,

Thanks for spotting this issue. I've managed to write a regression test which exposes this bug: JoshRosen@71845e3

This is only a problem when reference tracking is enabled and auto-reset has been disabled by a user's custom Kryo registrator. I think that this issue should be fairly easy to fix by adding reset calls at the end of serialize and serializeStream. I'll file a JIRA for this and put together a pull request shortly, then ping you for review.

@carrino
Copy link

carrino commented May 20, 2015

Good times. Thanks for being super responsive.

@JoshRosen
Copy link
Contributor Author

I've filed https://issues.apache.org/jira/browse/SPARK-7766 for this issue and have submitted #6293 to fix this.

asfgit pushed a commit that referenced this pull request May 22, 2015
…s disabled

SPARK-3386 / #5606 modified the shuffle write path to re-use serializer instances across multiple calls to DiskBlockObjectWriter. It turns out that this introduced a very rare bug when using `KryoSerializer`: if auto-reset is disabled and reference-tracking is enabled, then we'll end up re-using the same serializer instance to write multiple output streams without calling `reset()` between write calls, which can lead to cases where objects in one file may contain references to objects that are in previous files, causing errors during deserialization.

This patch fixes this bug by calling `reset()` at the start of `serialize()` and `serializeStream()`. I also added a regression test which demonstrates that this problem only occurs when auto-reset is disabled and reference-tracking is enabled.

Author: Josh Rosen <[email protected]>

Closes #6293 from JoshRosen/kryo-instance-reuse-bug and squashes the following commits:

e19726d [Josh Rosen] Add fix for SPARK-7766.
71845e3 [Josh Rosen] Add failing regression test to trigger Kryo re-use bug
asfgit pushed a commit that referenced this pull request May 22, 2015
…s disabled

SPARK-3386 / #5606 modified the shuffle write path to re-use serializer instances across multiple calls to DiskBlockObjectWriter. It turns out that this introduced a very rare bug when using `KryoSerializer`: if auto-reset is disabled and reference-tracking is enabled, then we'll end up re-using the same serializer instance to write multiple output streams without calling `reset()` between write calls, which can lead to cases where objects in one file may contain references to objects that are in previous files, causing errors during deserialization.

This patch fixes this bug by calling `reset()` at the start of `serialize()` and `serializeStream()`. I also added a regression test which demonstrates that this problem only occurs when auto-reset is disabled and reference-tracking is enabled.

Author: Josh Rosen <[email protected]>

Closes #6293 from JoshRosen/kryo-instance-reuse-bug and squashes the following commits:

e19726d [Josh Rosen] Add fix for SPARK-7766.
71845e3 [Josh Rosen] Add failing regression test to trigger Kryo re-use bug

(cherry picked from commit eac0069)
Signed-off-by: Josh Rosen <[email protected]>
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
…s disabled

SPARK-3386 / apache#5606 modified the shuffle write path to re-use serializer instances across multiple calls to DiskBlockObjectWriter. It turns out that this introduced a very rare bug when using `KryoSerializer`: if auto-reset is disabled and reference-tracking is enabled, then we'll end up re-using the same serializer instance to write multiple output streams without calling `reset()` between write calls, which can lead to cases where objects in one file may contain references to objects that are in previous files, causing errors during deserialization.

This patch fixes this bug by calling `reset()` at the start of `serialize()` and `serializeStream()`. I also added a regression test which demonstrates that this problem only occurs when auto-reset is disabled and reference-tracking is enabled.

Author: Josh Rosen <[email protected]>

Closes apache#6293 from JoshRosen/kryo-instance-reuse-bug and squashes the following commits:

e19726d [Josh Rosen] Add fix for SPARK-7766.
71845e3 [Josh Rosen] Add failing regression test to trigger Kryo re-use bug
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…s disabled

SPARK-3386 / apache#5606 modified the shuffle write path to re-use serializer instances across multiple calls to DiskBlockObjectWriter. It turns out that this introduced a very rare bug when using `KryoSerializer`: if auto-reset is disabled and reference-tracking is enabled, then we'll end up re-using the same serializer instance to write multiple output streams without calling `reset()` between write calls, which can lead to cases where objects in one file may contain references to objects that are in previous files, causing errors during deserialization.

This patch fixes this bug by calling `reset()` at the start of `serialize()` and `serializeStream()`. I also added a regression test which demonstrates that this problem only occurs when auto-reset is disabled and reference-tracking is enabled.

Author: Josh Rosen <[email protected]>

Closes apache#6293 from JoshRosen/kryo-instance-reuse-bug and squashes the following commits:

e19726d [Josh Rosen] Add fix for SPARK-7766.
71845e3 [Josh Rosen] Add failing regression test to trigger Kryo re-use bug
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
This patch modifies several shuffle-related code paths to share and re-use SerializerInstances instead of creating new ones.  Some serializers, such as KryoSerializer or SqlSerializer, can be fairly expensive to create or may consume moderate amounts of memory, so it's probably best to avoid unnecessary serializer creation in hot code paths.

The key change in this patch is modifying `getDiskWriter()` / `DiskBlockObjectWriter` to accept `SerializerInstance`s instead of `Serializer`s (which are factories for instances).  This allows the disk writer's creator to decide whether the serializer instance can be shared or re-used.

The rest of the patch modifies several write and read paths to use shared serializers.  One big win is in `ShuffleBlockFetcherIterator`, where we used to create a new serializer per received block.  Similarly, the shuffle write path used to create a new serializer per file even though in many cases only a single thread would be writing to a file at a time.

I made a small serializer reuse optimization in CoarseGrainedExecutorBackend as well, since it seemed like a small and obvious improvement.

Author: Josh Rosen <[email protected]>

Closes apache#5606 from JoshRosen/SPARK-3386 and squashes the following commits:

f661ce7 [Josh Rosen] Remove thread local; add comment instead
64f8398 [Josh Rosen] Use ThreadLocal for serializer instance in CoarseGrainedExecutorBackend
aeb680e [Josh Rosen] [SPARK-3386] Reuse SerializerInstance in shuffle code paths
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…s disabled

SPARK-3386 / apache#5606 modified the shuffle write path to re-use serializer instances across multiple calls to DiskBlockObjectWriter. It turns out that this introduced a very rare bug when using `KryoSerializer`: if auto-reset is disabled and reference-tracking is enabled, then we'll end up re-using the same serializer instance to write multiple output streams without calling `reset()` between write calls, which can lead to cases where objects in one file may contain references to objects that are in previous files, causing errors during deserialization.

This patch fixes this bug by calling `reset()` at the start of `serialize()` and `serializeStream()`. I also added a regression test which demonstrates that this problem only occurs when auto-reset is disabled and reference-tracking is enabled.

Author: Josh Rosen <[email protected]>

Closes apache#6293 from JoshRosen/kryo-instance-reuse-bug and squashes the following commits:

e19726d [Josh Rosen] Add fix for SPARK-7766.
71845e3 [Josh Rosen] Add failing regression test to trigger Kryo re-use bug
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants