Skip to content

Commit

Permalink
[SPARK-25827][CORE] Avoid converting incoming encrypted blocks to byt…
Browse files Browse the repository at this point in the history
…e buffers

## What changes were proposed in this pull request?

Avoid converting encrypted bocks to regular ByteBuffers, to ensure they can be sent over the network for replication & remote reads even when > 2GB.

Also updates some TODOs with links to a SPARK-25905 for improving the
handling here.

## How was this patch tested?

Tested on a cluster with encrypted data > 2GB (after SPARK-25904 was
applied as well).

Closes #22917 from squito/real_SPARK-25827.

Authored-by: Imran Rashid <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
squito authored and Marcelo Vanzin committed Nov 2, 2018
1 parent c71db43 commit 7ea594e
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ShuffleClient}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.{BlockId, EncryptedManagedBuffer, StorageLevel}
import org.apache.spark.util.ThreadUtils

private[spark]
Expand Down Expand Up @@ -104,6 +104,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
data match {
case f: FileSegmentManagedBuffer =>
result.success(f)
case e: EncryptedManagedBuffer =>
result.success(e)
case _ =>
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// TODO SPARK-25905 if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than reading the file into memory.
// Until then, replication can cause the process to use too much memory and get killed
// even though we've read the data to disk.
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private class DiskBlockData(
private def open() = new FileInputStream(file).getChannel
}

private class EncryptedBlockData(
private[spark] class EncryptedBlockData(
file: File,
blockSize: Long,
conf: SparkConf,
Expand Down Expand Up @@ -263,7 +263,8 @@ private class EncryptedBlockData(
}
}

private class EncryptedManagedBuffer(val blockData: EncryptedBlockData) extends ManagedBuffer {
private[spark] class EncryptedManagedBuffer(
val blockData: EncryptedBlockData) extends ManagedBuffer {

// This is the size of the decrypted data
override def size(): Long = blockData.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.config
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream}
import org.apache.spark.storage.StorageUtils
import org.apache.spark.storage.{EncryptedManagedBuffer, StorageUtils}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -173,11 +173,13 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
private[spark] object ChunkedByteBuffer {


// TODO eliminate this method if we switch BlockManager to getting InputStreams
// TODO SPARK-25905 eliminate this method if we switch BlockManager to getting InputStreams
def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = {
data match {
case f: FileSegmentManagedBuffer =>
fromFile(f.getFile, f.getOffset, f.getLength)
case e: EncryptedManagedBuffer =>
e.blockData.toChunkedByteBuffer(ByteBuffer.allocate _)
case other =>
new ChunkedByteBuffer(other.nioByteBuffer())
}
Expand Down

0 comments on commit 7ea594e

Please sign in to comment.