From f4cafa0712be5011ff9dc7e5e6d27a7077095b28 Mon Sep 17 00:00:00 2001 From: William Benton Date: Fri, 4 Apr 2014 17:15:50 -0500 Subject: [PATCH] Stylistic changes and cleanups --- .../scala/org/apache/spark/SparkContext.scala | 19 ++++++++----------- .../apache/spark/util/ClosureCleaner.scala | 8 ++++---- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4c5b35d4025bf..5339d97bb5fb1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1028,21 +1028,18 @@ class SparkContext( def cancelAllJobs() { dagScheduler.cancelAllJobs() } - - /** - * Clean a closure to make it ready to serialized and send to tasks - * (removes unreferenced variables in $outer's, updates REPL variables) - */ - private[spark] def clean[F <: AnyRef : ClassTag](f: F): F = { - clean(f, true) - } - + /** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) + * + * @param f closure to be cleaned and optionally serialized + * @param captureNow whether or not to serialize this closure and capture any free + * variables immediately; defaults to true. If this is set and f is not serializable, + * it will raise an exception. */ - private[spark] def clean[F <: AnyRef : ClassTag](f: F, checkSerializable: Boolean): F = { - ClosureCleaner.clean(f, checkSerializable) + private[spark] def clean[F <: AnyRef : ClassTag](f: F, captureNow: Boolean = true): F = { + ClosureCleaner.clean(f, captureNow) } /** diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 2db06548a1ac2..432278e17e13d 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -105,7 +105,7 @@ private[spark] object ClosureCleaner extends Logging { } } - def clean[F <: AnyRef : ClassTag](func: F, checkSerializable: Boolean = true): F = { + def clean[F <: AnyRef : ClassTag](func: F, captureNow: Boolean = true): F = { // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) @@ -155,14 +155,14 @@ private[spark] object ClosureCleaner extends Logging { field.set(func, outer) } - if (checkSerializable) { - ensureSerializable(func) + if (captureNow) { + cloneViaSerializing(func) } else { func } } - private def ensureSerializable[T: ClassTag](func: T) = { + private def cloneViaSerializing[T: ClassTag](func: T): T = { try { val serializer = SparkEnv.get.closureSerializer.newInstance() serializer.deserialize[T](serializer.serialize[T](func))