Skip to content
This repository has been archived by the owner on Nov 14, 2022. It is now read-only.

Commit

Permalink
[POAE7-858] disable RDD cache related PMem intialization as default a…
Browse files Browse the repository at this point in the history
…nd add PMem related logic in SparkEnv (#18)

* disable RDD cache related PMem intialization as default

* add PMem related logic in SparkEnv

* fix bug happened when spill fallback to disk
  • Loading branch information
yma11 authored Mar 8, 2021
1 parent ac0ccc0 commit 6a5b99c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public PMemReaderForUnsafeExternalSorter(
this.numRecordsRemaining = numRecords - position/2;
this.taskMetrics = taskMetrics;
int readBufferSize = SparkEnv.get() == null? 8 * 1024 * 1024 :
(int) SparkEnv.get().conf().get(package$.MODULE$.MEMORY_SPILL_PMEM_READ_BUFFERSIZE());
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.MEMORY_SPILL_PMEM_READ_BUFFERSIZE());
logger.info("PMem read buffer size is:" + Utils.bytesToString(readBufferSize));
this.byteBuffer = ByteBuffer.wrap(new byte[readBufferSize]);
byteBuffer.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void write() throws IOException {
long writeDuration = 0;
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Long> future = executorService.submit(()->dumpPagesToPMem());
externalSorter.getInMemSorter().getSortedIterator();
inMemSorter.getSortedIterator();
try {
writeDuration = future.get();
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -106,7 +106,7 @@ public void write() throws IOException {
} else if(!isSorted) {
dumpPagesToPMem();
// get sorted iterator
externalSorter.getInMemSorter().getSortedIterator();
inMemSorter.getSortedIterator();
// update LongArray
updateLongArray(inMemSorter.getArray(), totalRecordsWritten, 0);
} else {
Expand All @@ -121,7 +121,7 @@ public void write() throws IOException {
diskSpillWriter = new UnsafeSorterSpillWriter(
blockManager,
fileBufferSize,
sortedIterator,
isSorted? sortedIterator : inMemSorter.getSortedIterator(),
numberOfRecordsToWritten,
serializerManager,
writeMetrics,
Expand All @@ -138,6 +138,7 @@ public boolean allocatePMemPages(LinkedList<MemoryBlock> dramPages, MemoryBlock
allocatedPMemPages.add(pMemBlock);
pageMap.put(page, pMemBlock);
} else {
freeAllPMemPages();
pageMap.clear();
return false;
}
Expand All @@ -147,6 +148,7 @@ public boolean allocatePMemPages(LinkedList<MemoryBlock> dramPages, MemoryBlock
allocatedPMemPages.add(pMemPageForLongArray);
pageMap.put(longArrayPage, pMemPageForLongArray);
} else {
freeAllPMemPages();
pageMap.clear();
return false;
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.collection.mutable
import scala.util.Properties

import com.google.common.cache.CacheBuilder
import com.intel.oap.common.unsafe.PersistentMemoryPlatform

import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -244,6 +246,14 @@ object SparkEnv extends Logging {

val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

val pMemEnabled = conf.get(MEMORY_SPILL_PMEM_ENABLED);
if (pMemEnabled && !isDriver) {
val pMemInitialPath = conf.get(MEMORY_EXTENDED_PATH)
val pMemInitialSize = conf.get(MEMORY_EXTENDED_SIZE)
PersistentMemoryPlatform.initialize(pMemInitialPath, pMemInitialSize, 0)
logInfo(s"PMem initialize path: ${pMemInitialPath}, size: ${pMemInitialSize} ")
}

// Listener bus is only used on the driver
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private[spark] class BlockManager(
val pmemMode = conf.get("spark.memory.pmem.mode", "AppDirect")
val numNum = conf.getInt("spark.yarn.numa.num", 2)

if (pmemMode.equals("AppDirect")) {
if (memExtensionEnabled && pmemMode.equals("AppDirect")) {
if (!isDriver && pmemInitialPaths.size >= 1) {
if (numaNodeId == -1) {
numaNodeId = executorId.toInt
Expand All @@ -213,7 +213,7 @@ private[spark] class BlockManager(
PersistentMemoryPlatform.initialize(file.getAbsolutePath, pmemInitialSize, 0)
logInfo(s"Intel Optane PMem initialized with path: ${file.getAbsolutePath}, size: ${pmemInitialSize} ")
}
} else if (pmemMode.equals("KMemDax")) {
} else if (memExtensionEnabled && pmemMode.equals("KMemDax")) {
if (!isDriver) {
if (numaNodeId == -1) {
numaNodeId = (executorId.toInt + 1) % 2
Expand Down

0 comments on commit 6a5b99c

Please sign in to comment.