diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index a8d20ee332355..fe33fa841a0d8 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -26,6 +26,8 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, Cl import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ import org.apache.spark.Logging +import org.apache.spark.SparkEnv +import org.apache.spark.SparkException private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it @@ -101,7 +103,7 @@ private[spark] object ClosureCleaner extends Logging { } } - def clean(func: AnyRef) { + def clean(func: AnyRef, checkSerializable: Boolean = true) { // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) @@ -150,6 +152,18 @@ private[spark] object ClosureCleaner extends Logging { field.setAccessible(true) field.set(func, outer) } + + if (checkSerializable) { + ensureSerializable(func) + } + } + + private def ensureSerializable(func: AnyRef) { + try { + SparkEnv.get.closureSerializer.newInstance().serialize(func) + } catch { + case ex: Exception => throw new SparkException("Task not serializable: " + ex.toString) + } } private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {