From 7ea594e7876258296f340daddefcaf71a64ab824 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 2 Nov 2018 13:24:55 -0700 Subject: [PATCH] [SPARK-25827][CORE] Avoid converting incoming encrypted blocks to byte buffers ## What changes were proposed in this pull request? Avoid converting encrypted bocks to regular ByteBuffers, to ensure they can be sent over the network for replication & remote reads even when > 2GB. Also updates some TODOs with links to a SPARK-25905 for improving the handling here. ## How was this patch tested? Tested on a cluster with encrypted data > 2GB (after SPARK-25904 was applied as well). Closes #22917 from squito/real_SPARK-25827. Authored-by: Imran Rashid Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/network/BlockTransferService.scala | 4 +++- .../main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- .../src/main/scala/org/apache/spark/storage/DiskStore.scala | 5 +++-- .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 6 ++++-- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index eef8c31e05ab1..a58c8fa2e763f 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -27,7 +27,7 @@ import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ShuffleClient} -import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.storage.{BlockId, EncryptedManagedBuffer, StorageLevel} import org.apache.spark.util.ThreadUtils private[spark] @@ -104,6 +104,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo data match { case f: FileSegmentManagedBuffer => result.success(f) + case e: EncryptedManagedBuffer => + result.success(e) case _ => val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c01a453151911..e35dd72521247 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -721,7 +721,7 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { - // TODO if we change this method to return the ManagedBuffer, then getRemoteValues + // TODO SPARK-25905 if we change this method to return the ManagedBuffer, then getRemoteValues // could just use the inputStream on the temp file, rather than reading the file into memory. // Until then, replication can cause the process to use too much memory and get killed // even though we've read the data to disk. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index d88bd710d1ead..841e16afc7549 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -201,7 +201,7 @@ private class DiskBlockData( private def open() = new FileInputStream(file).getChannel } -private class EncryptedBlockData( +private[spark] class EncryptedBlockData( file: File, blockSize: Long, conf: SparkConf, @@ -263,7 +263,8 @@ private class EncryptedBlockData( } } -private class EncryptedManagedBuffer(val blockData: EncryptedBlockData) extends ManagedBuffer { +private[spark] class EncryptedManagedBuffer( + val blockData: EncryptedBlockData) extends ManagedBuffer { // This is the size of the decrypted data override def size(): Long = blockData.size diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 9547cb49bbee8..da2be84723a07 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -29,7 +29,7 @@ import org.apache.spark.SparkEnv import org.apache.spark.internal.config import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream} -import org.apache.spark.storage.StorageUtils +import org.apache.spark.storage.{EncryptedManagedBuffer, StorageUtils} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils @@ -173,11 +173,13 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { private[spark] object ChunkedByteBuffer { - // TODO eliminate this method if we switch BlockManager to getting InputStreams + // TODO SPARK-25905 eliminate this method if we switch BlockManager to getting InputStreams def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = { data match { case f: FileSegmentManagedBuffer => fromFile(f.getFile, f.getOffset, f.getLength) + case e: EncryptedManagedBuffer => + e.blockData.toChunkedByteBuffer(ByteBuffer.allocate _) case other => new ChunkedByteBuffer(other.nioByteBuffer()) }