-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
spark-729: predictable closure capture #1322
Conversation
This method allows code that needs access to the currently-active ContextCleaner to access it via a DynamicVariable.
The two tests added to ClosureCleanerSuite ensure that variable values are captured at RDD definition time, not at job-execution time.
The environments of serializable closures are now captured as part of closure cleaning. Since we already proactively check most closures for serializability, ClosureCleaner.clean now returns the result of deserializing the serialized version of the cleaned closure. Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala
There are two possible cases for runJob calls: either they are called by RDD action methods from inside Spark or they are called from client code. There's no need to proactively check the closure argument to runJob for serializability or force variable capture in either case: 1. if they are called by RDD actions, their closure arguments consist of mapping an already-serializable closure (with an already-frozen environment) to each element in the RDD; 2. in both cases, the closure is about to execute and thus the benefit of proactively checking for serializability (or ensuring immediate variable capture) is nonexistent. (Note that ensuring capture via serializability on closure arguments to runJob also causes pyspark accumulators to fail to update.) Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala
This splits the test identifying expected failures due to closure serializability into three cases.
Conflicts: core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala
Conflicts: streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16383/ |
Jenkins, add to whitelist and test this please |
@rxin you may want to look at this with your broadcast change |
QA tests have started for PR 1322. This patch DID NOT merge cleanly! |
Matei was referring to #1498 |
Thanks, @rxin |
Alright, just ping me (with @mateiz) when you think it's ready. |
BTW @willb, if this is not ready, do you mind closing the PR and resending when it is? We'd like to minimize the number of open PRs that aren't actively being reviewed. |
@mateiz sure; I've tracked down the problem but am a bit stumped by how to fix it. I'll reopen when I have a solution. |
Alright, feel free to describe this on the JIRA too if you'd like input. |
Co-authored-by: Russell Spitzer <[email protected]>
SPARK-729 concerns when free variables in closure arguments to transformations are captured. Currently, it is possible for closures to get the environment in which they are serialized (not the environment in which they are created). This PR causes free variables in closure arguments to RDD transformations to be captured at closure creation time by modifying
ClosureCleaner
to serialize and deserialize its argument.This PR is based on #189 (which is closed) but has fixes to work with some changes in 1.0. In particular, it ensures that the cloned
Broadcast
objects produced by closure capture are registered withContextCleaner
so that broadcast variables won't become invalid simply because variable capture (implemented this way) causes strong references to the original broadcast variables to go away.(See #189 for additional discussion and background.)