From 3a6b4ef19f44337417cac380764ee020fb31ed64 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 21 Sep 2017 10:20:19 -0700 Subject: [PATCH] [SPARK-21928][CORE] Set classloader on SerializerManager's private kryo We have to make sure that SerializerManager's private instance of kryo also uses the right classloader, regardless of the current thread classloader. In particular, this fixes serde during remote cache fetches, as those occur in netty threads. Manual tests & existing suite via jenkins. I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally. I manually verified the warning message is no longer present. Author: Imran Rashid Closes #19280 from squito/SPARK-21928_ser_classloader. (cherry picked from commit b75bd1777496ce0354458bf85603a8087a6a0ff8) --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 +++ .../scala/org/apache/spark/serializer/SerializerManager.scala | 4 ++++ .../test/scala/org/apache/spark/executor/ExecutorSuite.scala | 3 ++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3346f6dd1f975..bc32436dc1c04 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -113,6 +113,9 @@ private[spark] class Executor( // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) + // SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads + // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. + env.serializerManager.setDefaultClassLoader(replClassLoader) // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index dd98ea265ce4a..5832048f79372 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -42,6 +42,10 @@ private[spark] class SerializerManager( private[this] val kryoSerializer = new KryoSerializer(conf) + def setDefaultClassLoader(classLoader: ClassLoader): Unit = { + kryoSerializer.setDefaultClassLoader(classLoader) + } + private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]] private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { val primitiveClassTags = Set[ClassTag[_]]( diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 683eeeeb6d661..6b486fa25efef 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.memory.MemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.{FakeTask, Task} -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} class ExecutorSuite extends SparkFunSuite { @@ -47,6 +47,7 @@ class ExecutorSuite extends SparkFunSuite { val mockMemoryManager = mock(classOf[MemoryManager]) when(mockEnv.conf).thenReturn(conf) when(mockEnv.serializer).thenReturn(serializer) + when(mockEnv.serializerManager).thenReturn(mock(classOf[SerializerManager])) when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem) when(mockEnv.memoryManager).thenReturn(mockMemoryManager)