Skip to content
This repository has been archived by the owner on Nov 14, 2022. It is now read-only.

[POAE7-858] disable RDD cache related PMem intialization as default and add PMem related logic in SparkEnv #18

Merged
merged 4 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public PMemReaderForUnsafeExternalSorter(
this.numRecordsRemaining = numRecords - position/2;
this.taskMetrics = taskMetrics;
int readBufferSize = SparkEnv.get() == null? 8 * 1024 * 1024 :
(int) SparkEnv.get().conf().get(package$.MODULE$.MEMORY_SPILL_PMEM_READ_BUFFERSIZE());
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.MEMORY_SPILL_PMEM_READ_BUFFERSIZE());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is referring other similar usages in Vanilla Spark.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, didn't think up why they didn't use a getLong API? That seems better.

logger.info("PMem read buffer size is:" + Utils.bytesToString(readBufferSize));
this.byteBuffer = ByteBuffer.wrap(new byte[readBufferSize]);
byteBuffer.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void write() throws IOException {
long writeDuration = 0;
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Long> future = executorService.submit(()->dumpPagesToPMem());
externalSorter.getInMemSorter().getSortedIterator();
inMemSorter.getSortedIterator();
try {
writeDuration = future.get();
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -106,7 +106,7 @@ public void write() throws IOException {
} else if(!isSorted) {
dumpPagesToPMem();
// get sorted iterator
externalSorter.getInMemSorter().getSortedIterator();
inMemSorter.getSortedIterator();
// update LongArray
updateLongArray(inMemSorter.getArray(), totalRecordsWritten, 0);
} else {
Expand All @@ -121,7 +121,7 @@ public void write() throws IOException {
diskSpillWriter = new UnsafeSorterSpillWriter(
blockManager,
fileBufferSize,
sortedIterator,
isSorted? sortedIterator : inMemSorter.getSortedIterator(),
numberOfRecordsToWritten,
serializerManager,
writeMetrics,
Expand All @@ -138,6 +138,7 @@ public boolean allocatePMemPages(LinkedList<MemoryBlock> dramPages, MemoryBlock
allocatedPMemPages.add(pMemBlock);
pageMap.put(page, pMemBlock);
} else {
freeAllPMemPages();
pageMap.clear();
return false;
}
Expand All @@ -147,6 +148,7 @@ public boolean allocatePMemPages(LinkedList<MemoryBlock> dramPages, MemoryBlock
allocatedPMemPages.add(pMemPageForLongArray);
pageMap.put(longArrayPage, pMemPageForLongArray);
} else {
freeAllPMemPages();
pageMap.clear();
return false;
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.collection.mutable
import scala.util.Properties

import com.google.common.cache.CacheBuilder
import com.intel.oap.common.unsafe.PersistentMemoryPlatform

import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -244,6 +246,14 @@ object SparkEnv extends Logging {

val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

val pMemEnabled = conf.get(MEMORY_SPILL_PMEM_ENABLED);
if (pMemEnabled && !isDriver) {
val pMemInitialPath = conf.get(MEMORY_EXTENDED_PATH)
val pMemInitialSize = conf.get(MEMORY_EXTENDED_SIZE)
PersistentMemoryPlatform.initialize(pMemInitialPath, pMemInitialSize, 0)
logInfo(s"PMem initialize path: ${pMemInitialPath}, size: ${pMemInitialSize} ")
}

// Listener bus is only used on the driver
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private[spark] class BlockManager(
val pmemMode = conf.get("spark.memory.pmem.mode", "AppDirect")
val numNum = conf.getInt("spark.yarn.numa.num", 2)

if (pmemMode.equals("AppDirect")) {
if (memExtensionEnabled && pmemMode.equals("AppDirect")) {
if (!isDriver && pmemInitialPaths.size >= 1) {
if (numaNodeId == -1) {
numaNodeId = executorId.toInt
Expand All @@ -213,7 +213,7 @@ private[spark] class BlockManager(
PersistentMemoryPlatform.initialize(file.getAbsolutePath, pmemInitialSize, 0)
logInfo(s"Intel Optane PMem initialized with path: ${file.getAbsolutePath}, size: ${pmemInitialSize} ")
}
} else if (pmemMode.equals("KMemDax")) {
} else if (memExtensionEnabled && pmemMode.equals("KMemDax")) {
if (!isDriver) {
if (numaNodeId == -1) {
numaNodeId = (executorId.toInt + 1) % 2
Expand Down