Skip to content

Commit

Permalink
Adds proactive closure-serializablilty checking
Browse files Browse the repository at this point in the history
ClosureCleaner.clean now checks to ensure that its closure argument
is serializable by default and throws a SparkException with the
underlying NotSerializableException in the detail message otherwise.
As a result, transformation invocations with unserializable closures
will fail at their call sites rather than when they actually execute.

ClosureCleaner.clean now takes a second boolean argument; pass false
to disable serializability-checking behavior at call sites where this
behavior isn't desired.
  • Loading branch information
willb committed Mar 20, 2014
1 parent 21b4b06 commit d8df3db
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, Cl
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._

import org.apache.spark.Logging
import org.apache.spark.SparkEnv
import org.apache.spark.SparkException

private[spark] object ClosureCleaner extends Logging {
// Get an ASM class reader for a given class from the JAR that loaded it
Expand Down Expand Up @@ -101,7 +103,7 @@ private[spark] object ClosureCleaner extends Logging {
}
}

def clean(func: AnyRef) {
def clean(func: AnyRef, checkSerializable: Boolean = true) {
// TODO: cache outerClasses / innerClasses / accessedFields
val outerClasses = getOuterClasses(func)
val innerClasses = getInnerClasses(func)
Expand Down Expand Up @@ -150,6 +152,18 @@ private[spark] object ClosureCleaner extends Logging {
field.setAccessible(true)
field.set(func, outer)
}

if (checkSerializable) {
ensureSerializable(func)
}
}

private def ensureSerializable(func: AnyRef) {
try {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
} catch {
case ex: Exception => throw new SparkException("Task not serializable: " + ex.toString)
}
}

private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
Expand Down

0 comments on commit d8df3db

Please sign in to comment.