Skip to content

Commit

Permalink
Remove thread local; add comment instead
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Apr 21, 2015
1 parent 64f8398 commit f661ce7
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ private[spark] class CoarseGrainedExecutorBackend(
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

// This is a thread-local in case we ever decide to change this to a non-thread-safe RpcEndpoint
private[this] val ser: ThreadLocal[SerializerInstance] = new ThreadLocal[SerializerInstance] {
override def initialValue: SerializerInstance = env.closureSerializer.newInstance()
}
// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
// to be changed so that we don't share the serializer instance across threads
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

override def onStart() {
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -89,7 +88,7 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.get().deserialize[TaskDescription](data.value)
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
Expand Down

0 comments on commit f661ce7

Please sign in to comment.