From 7f18715ba3137f2faf224b8c9b6d7eef076851db Mon Sep 17 00:00:00 2001 From: Eugene-Mark Date: Fri, 20 Aug 2021 16:41:09 +0800 Subject: [PATCH] [PMEM-SHUFFLE-46] Fix the bug that off-heap memory is over used in shuffle reduce stage --- .../apache/spark/shuffle/pmof/BaseShuffleReader.scala | 2 +- .../apache/spark/shuffle/pmof/PmofShuffleManager.scala | 2 +- .../spark/storage/pmof/PmemBlockInputStream.scala | 8 +++++--- .../spark/storage/pmof/PmemBlockOutputStream.scala | 4 ++-- .../apache/spark/storage/pmof/PmemOutputStream.scala | 10 +++++----- .../util/collection/pmof/PmemExternalSorter.scala | 3 +-- 6 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala index 5e367bfa..98371a59 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala @@ -90,7 +90,7 @@ private[spark] class BaseShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C], /** * Force iterator to traverse itself and update internal counter **/ - wrappedStreams.size + //wrappedStreams.size val serializerInstance = dep.serializer.newInstance() diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala index 51a6b3bc..6e056430 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala @@ -62,7 +62,6 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = { val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) - val env: SparkEnv = SparkEnv.get if (pmofConf.enableRemotePmem) { new RpmpShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], @@ -73,6 +72,7 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager context, pmofConf) } else if (pmofConf.enableRdma) { + val env: SparkEnv = SparkEnv.get metadataResolver = MetadataResolver.getMetadataResolver(pmofConf) PmofTransferService.getTransferServiceInstance(pmofConf, env.blockManager, this) new RdmaShuffleReader( diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockInputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockInputStream.scala index ff92982c..7610cb2f 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockInputStream.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockInputStream.scala @@ -14,10 +14,10 @@ trait PmemBlockInputStream[K, C] { } class LocalPmemBlockInputStream[K, C]( - blockId: BlockId, - total_records: Long, + pmemBlockOutputStream: PmemBlockOutputStream, serializer: Serializer) extends PmemBlockInputStream[K, C] { + val blockId: BlockId = pmemBlockOutputStream.getBlockId() val serializerManager: SerializerManager = SparkEnv.get.serializerManager val serInstance: SerializerInstance = serializer.newInstance() val persistentMemoryWriter: PersistentMemoryHandler = @@ -27,7 +27,9 @@ class LocalPmemBlockInputStream[K, C]( var inObjStream: DeserializationStream = serInstance.deserializeStream(wrappedStream) var indexInBatch: Int = 0 + var total_records: Long = 0 var closing: Boolean = false + total_records = pmemBlockOutputStream.getTotalRecords() def readNextItem(): (K, C) = { if (closing == true) { @@ -126,4 +128,4 @@ class RemotePmemBlockInputStream[K, C]( } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala index 55430bf0..976f15e7 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala @@ -95,13 +95,13 @@ private[spark] class PmemBlockOutputStream( if ((pmofConf.spill_throttle != -1 && pmemOutputStream.bufferRemainingSize >= pmofConf.spill_throttle) || force == true) { val start = System.nanoTime() flush() - //pmemOutputStream.doFlush() + pmemOutputStream.doFlush() val bufSize = pmemOutputStream.flushedSize mapStatus += ((pmemOutputStream.flushed_block_id, bufSize, recordsPerBlock)) if (bufSize > 0) { recordsArray += recordsPerBlock recordsPerBlock = 0 - size += bufSize + size = bufSize if (blockId.isShuffle == true) { val writeMetrics = taskMetrics.shuffleWriteMetrics diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala index 98efe403..00660144 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala @@ -19,7 +19,7 @@ class PmemOutputStream( var is_closed = false var key_id = 0 - val length: Int = 1024 * 1024 * 6 + val length: Int = bufferSize var bufferFlushedSize: Int = 0 var bufferRemainingSize: Int = 0 val buf: ByteBuf = NettyByteBufferPool.allocateFlexibleNewBuffer(length) @@ -37,6 +37,10 @@ class PmemOutputStream( } override def flush(): Unit = { + + } + + def doFlush(): Unit = { if (bufferRemainingSize > 0) { if (remotePersistentMemoryPool != null) { logDebug(s" [PUT Started]${cur_block_id}-${bufferRemainingSize}") @@ -73,10 +77,6 @@ class PmemOutputStream( } } - def doFlush(): Unit = { - - } - def flushedSize(): Int = { bufferFlushedSize } diff --git a/core/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala index 3c4e47d0..8f511742 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala @@ -337,8 +337,7 @@ private[spark] class PmemExternalSorter[K, V, C]( // which is different from spark original codes (relate to one spill file) val pmemBlockInputStream = if (!pmofConf.enableRemotePmem) { new LocalPmemBlockInputStream[K, C]( - pmemBlockOutputStream.getBlockId, - pmemBlockOutputStream.getTotalRecords, + pmemBlockOutputStream, serializer) } else { new RemotePmemBlockInputStream[K, C](