Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[PMEM-SHUFFLE-15]Further optimization and document update about the integration to Spark 3.1.1 #20

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ Use `spark.shuffle.pmof.pmpool_size` to specify the size of created shuffle file
**Misc**
The config `spark.sql.shuffle.partitions` is required to set explicitly, it's suggested to use default value `200` unless you're pretty sure what's the meaning of this value.

Since Spark 3.1.1, please explicitly set `spark.shuffle.readHostLocalDisk` to `false`.

## <a id="pmem-shuffle-for-spark-testing"></a>7. PMem Shuffle for Spark Testing
-----------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,13 @@ private[spark] class BaseShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
}

// Sort the output if there is a sort ordering defined.
val resultIter = dep.keyOrdering match {
val resultIter: Iterator[Product2[K, C]] = dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
assert(pmofConf.enablePmem)
// Create an ExternalSorter to sort the data.
val sorter =
new PmemExternalSorter[K, C, C](context, handle, pmofConf, ordering = Some(keyOrd), serializer = dep.serializer)
logDebug("call PmemExternalSorter.insertAll for shuffle_0_" + handle.shuffleId + "_[" + startPartition + "," + endPartition + "]")
sorter.insertAll(aggregatedIter)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => {
sorter.stop()
})
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
sorter.insertAllAndUpdateMetrics(aggregatedIter)
case None =>
aggregatedIter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@ import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockId}
import org.apache.spark.storage.pmof.{PersistentMemoryHandler, PmemBlockOutputStream }
import org.apache.spark.network.buffer.ManagedBuffer

/**
* The mapping between logic block and physical file location.
*
**/
private[spark] class PmemShuffleBlockResolver(
conf: SparkConf,
_blockManager: BlockManager = null)
extends IndexShuffleBlockResolver(conf, _blockManager) with Logging {

var partitionBufferArray: Array[PmemBlockOutputStream] = _

/**
* Retrieve the data for the specified block.
*
*/
override def getBlockData(blockId: BlockId, dirs: Option[Array[String]]): ManagedBuffer = {
// return BlockId corresponding ManagedBuffer
val persistentMemoryHandler = PersistentMemoryHandler.getPersistentMemoryHandler
Expand All @@ -25,6 +33,9 @@ private[spark] class PmemShuffleBlockResolver(
super.stop()
}

/**
* Remove block from persistent memory based on block id
**/
override def removeDataByMap(shuffleId: ShuffleId, mapId: Long): Unit ={
val partitionNumber = conf.get("spark.sql.shuffle.partitions")
val persistentMemoryHandler = PersistentMemoryHandler.getPersistentMemoryHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.spark.shuffle.BaseShuffleHandle
import org.apache.spark.util.collection._
import org.apache.spark.storage.pmof._
import org.apache.spark.util.configuration.pmof.PmofConf
import org.apache.spark.util.{CompletionIterator, Utils => TryUtils}

private[spark] class PmemExternalSorter[K, V, C](
context: TaskContext,
Expand Down Expand Up @@ -354,4 +355,20 @@ private[spark] class PmemExternalSorter[K, V, C](
}
}
}

/**
* Insert all records, updates related task metrics, and return a completion iterator
* over all the data written to this object, aggregated by our aggregator.
* On task completion (success, failure, or cancellation), it releases resources by
* calling `stop()`.
*/
def insertAllAndUpdateMetrics(records: Iterator[Product2[K, V]]): Iterator[Product2[K, C]] = {
insertAll(records)
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was finished/cancelled.
context.addTaskCompletionListener[Unit](_ => stop())
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](iterator, stop())
}
}