From d8df3dbbed3e1c9e16a2c2002c04c2d2e6a0b2e2 Mon Sep 17 00:00:00 2001 From: William Benton Date: Thu, 13 Mar 2014 14:40:42 -0500 Subject: [PATCH] Adds proactive closure-serializablilty checking ClosureCleaner.clean now checks to ensure that its closure argument is serializable by default and throws a SparkException with the underlying NotSerializableException in the detail message otherwise. As a result, transformation invocations with unserializable closures will fail at their call sites rather than when they actually execute. ClosureCleaner.clean now takes a second boolean argument; pass false to disable serializability-checking behavior at call sites where this behavior isn't desired. --- .../org/apache/spark/util/ClosureCleaner.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index a8d20ee332355..fe33fa841a0d8 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -26,6 +26,8 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, Cl import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ import org.apache.spark.Logging +import org.apache.spark.SparkEnv +import org.apache.spark.SparkException private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it @@ -101,7 +103,7 @@ private[spark] object ClosureCleaner extends Logging { } } - def clean(func: AnyRef) { + def clean(func: AnyRef, checkSerializable: Boolean = true) { // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) @@ -150,6 +152,18 @@ private[spark] object ClosureCleaner extends Logging { field.setAccessible(true) field.set(func, outer) } + + if (checkSerializable) { + ensureSerializable(func) + } + } + + private def ensureSerializable(func: AnyRef) { + try { + SparkEnv.get.closureSerializer.newInstance().serialize(func) + } catch { + case ex: Exception => throw new SparkException("Task not serializable: " + ex.toString) + } } private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {