diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8300f9f2190b9..8af46f3327adb 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{SignalLogger, Utils} private[spark] class CoarseGrainedExecutorBackend( @@ -47,6 +48,10 @@ private[spark] class CoarseGrainedExecutorBackend( var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None + // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need + // to be changed so that we don't share the serializer instance across threads + private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() + override def onStart() { import scala.concurrent.ExecutionContext.Implicits.global logInfo("Connecting to driver: " + driverUrl) @@ -83,7 +88,6 @@ private[spark] class CoarseGrainedExecutorBackend( logError("Received LaunchTask command but executor was null") System.exit(1) } else { - val ser = env.closureSerializer.newInstance() val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index 5be3ed771e534..538e150ead05a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -113,11 +113,12 @@ class FileShuffleBlockManager(conf: SparkConf) private var fileGroup: ShuffleFileGroup = null val openStartTime = System.nanoTime + val serializerInstance = serializer.newInstance() val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { fileGroup = getUnusedFileGroup() Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize, + blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize, writeMetrics) } } else { @@ -133,7 +134,8 @@ class FileShuffleBlockManager(conf: SparkConf) logWarning(s"Failed to remove existing shuffle file $blockFile") } } - blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics) + blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize, + writeMetrics) } } // Creating the file to write to and creating a disk writer both involve interacting with diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1aa0ef18de118..145a9c1ae3391 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -37,7 +37,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExternalShuffleClient import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.rpc.RpcEnv -import org.apache.spark.serializer.Serializer +import org.apache.spark.serializer.{SerializerInstance, Serializer} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.util._ @@ -646,13 +646,13 @@ private[spark] class BlockManager( def getDiskWriter( blockId: BlockId, file: File, - serializer: Serializer, + serializerInstance: SerializerInstance, bufferSize: Int, writeMetrics: ShuffleWriteMetrics): BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) val syncWrites = conf.getBoolean("spark.shuffle.sync", false) - new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites, - writeMetrics) + new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream, + syncWrites, writeMetrics) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 0dfc91dfaff85..14833791f7a4d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -21,7 +21,7 @@ import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream} import java.nio.channels.FileChannel import org.apache.spark.Logging -import org.apache.spark.serializer.{SerializationStream, Serializer} +import org.apache.spark.serializer.{SerializerInstance, SerializationStream} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.util.Utils @@ -71,7 +71,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { private[spark] class DiskBlockObjectWriter( blockId: BlockId, file: File, - serializer: Serializer, + serializerInstance: SerializerInstance, bufferSize: Int, compressStream: OutputStream => OutputStream, syncWrites: Boolean, @@ -134,7 +134,7 @@ private[spark] class DiskBlockObjectWriter( ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() bs = compressStream(new BufferedOutputStream(ts, bufferSize)) - objOut = serializer.newInstance().serializeStream(bs) + objOut = serializerInstance.serializeStream(bs) initialized = true this } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 8f28ef49a8a6f..f3379521d55e2 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, TaskContext} import org.apache.spark.network.BlockTransferService import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} import org.apache.spark.network.buffer.ManagedBuffer -import org.apache.spark.serializer.Serializer +import org.apache.spark.serializer.{SerializerInstance, Serializer} import org.apache.spark.util.{CompletionIterator, Utils} /** @@ -106,6 +106,8 @@ final class ShuffleBlockFetcherIterator( private[this] val shuffleMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() + private[this] val serializerInstance: SerializerInstance = serializer.newInstance() + /** * Whether the iterator is still active. If isZombie is true, the callback interface will no * longer place fetched blocks into [[results]]. @@ -299,7 +301,7 @@ final class ShuffleBlockFetcherIterator( // the scheduler gets a FetchFailedException. Try(buf.createInputStream()).map { is0 => val is = blockManager.wrapForCompression(blockId, is0) - val iter = serializer.newInstance().deserializeStream(is).asIterator + val iter = serializerInstance.deserializeStream(is).asIterator CompletionIterator[Any, Iterator[Any]](iter, { // Once the iterator is exhausted, release the buffer and set currentResult to null // so we don't release it again in cleanup. diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 9ff4744593d4d..30dd7f22e494f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -151,8 +151,7 @@ class ExternalAppendOnlyMap[K, V, C]( override protected[this] def spill(collection: SizeTracker): Unit = { val (blockId, file) = diskBlockManager.createTempLocalBlock() curWriteMetrics = new ShuffleWriteMetrics() - var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize, - curWriteMetrics) + var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) var objectsWritten = 0 // List of batch sizes (bytes) in the order they are written to disk @@ -179,8 +178,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (objectsWritten == serializerBatchSize) { flush() curWriteMetrics = new ShuffleWriteMetrics() - writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize, - curWriteMetrics) + writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) } } if (objectsWritten > 0) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 035f3767ff554..79a1a8a0dae38 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -272,7 +272,8 @@ private[spark] class ExternalSorter[K, V, C]( // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() curWriteMetrics = new ShuffleWriteMetrics() - var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) + var writer = blockManager.getDiskWriter( + blockId, file, serInstance, fileBufferSize, curWriteMetrics) var objectsWritten = 0 // Objects written since the last flush // List of batch sizes (bytes) in the order they are written to disk @@ -308,7 +309,8 @@ private[spark] class ExternalSorter[K, V, C]( if (objectsWritten == serializerBatchSize) { flush() curWriteMetrics = new ShuffleWriteMetrics() - writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) + writer = blockManager.getDiskWriter( + blockId, file, serInstance, fileBufferSize, curWriteMetrics) } } if (objectsWritten > 0) { @@ -358,7 +360,9 @@ private[spark] class ExternalSorter[K, V, C]( // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() - blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() + val writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, + curWriteMetrics) + writer.open() } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be @@ -749,8 +753,8 @@ private[spark] class ExternalSorter[K, V, C]( // partition and just write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { - val writer = blockManager.getDiskWriter( - blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) + val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, + context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala index 78bbc4ec2c620..003a728cb84a0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala @@ -30,7 +30,7 @@ class BlockObjectWriterSuite extends FunSuite { val file = new File(Utils.createTempDir(), "somefile") val writeMetrics = new ShuffleWriteMetrics() val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.write(Long.box(20)) // Record metrics update on every write @@ -52,7 +52,7 @@ class BlockObjectWriterSuite extends FunSuite { val file = new File(Utils.createTempDir(), "somefile") val writeMetrics = new ShuffleWriteMetrics() val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.write(Long.box(20)) // Record metrics update on every write @@ -75,7 +75,7 @@ class BlockObjectWriterSuite extends FunSuite { val file = new File(Utils.createTempDir(), "somefile") val writeMetrics = new ShuffleWriteMetrics() val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, - new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) + new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics) writer.open() writer.close()