Skip to content
This repository has been archived by the owner on Nov 14, 2022. It is now read-only.

[POAE7-858] disable RDD cache related PMem intialization as default and add PMem related logic in SparkEnv #18

Merged
merged 4 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public PMemReaderForUnsafeExternalSorter(
this.numRecordsRemaining = numRecords - position/2;
this.taskMetrics = taskMetrics;
int readBufferSize = SparkEnv.get() == null? 8 * 1024 * 1024 :
(int) SparkEnv.get().conf().get(package$.MODULE$.MEMORY_SPILL_PMEM_READ_BUFFERSIZE());
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.MEMORY_SPILL_PMEM_READ_BUFFERSIZE());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is referring other similar usages in Vanilla Spark.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, didn't think up why they didn't use a getLong API? That seems better.

logger.info("PMem read buffer size is:" + Utils.bytesToString(readBufferSize));
this.byteBuffer = ByteBuffer.wrap(new byte[readBufferSize]);
byteBuffer.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void write() throws IOException {
long writeDuration = 0;
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Long> future = executorService.submit(()->dumpPagesToPMem());
externalSorter.getInMemSorter().getSortedIterator();
inMemSorter.getSortedIterator();
try {
writeDuration = future.get();
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -106,7 +106,7 @@ public void write() throws IOException {
} else if(!isSorted) {
dumpPagesToPMem();
// get sorted iterator
externalSorter.getInMemSorter().getSortedIterator();
inMemSorter.getSortedIterator();
// update LongArray
updateLongArray(inMemSorter.getArray(), totalRecordsWritten, 0);
} else {
Expand All @@ -121,7 +121,7 @@ public void write() throws IOException {
diskSpillWriter = new UnsafeSorterSpillWriter(
blockManager,
fileBufferSize,
sortedIterator,
isSorted? sortedIterator : inMemSorter.getSortedIterator(),
numberOfRecordsToWritten,
serializerManager,
writeMetrics,
Expand All @@ -138,6 +138,7 @@ public boolean allocatePMemPages(LinkedList<MemoryBlock> dramPages, MemoryBlock
allocatedPMemPages.add(pMemBlock);
pageMap.put(page, pMemBlock);
} else {
freeAllPMemPages();
pageMap.clear();
return false;
}
Expand All @@ -147,6 +148,7 @@ public boolean allocatePMemPages(LinkedList<MemoryBlock> dramPages, MemoryBlock
allocatedPMemPages.add(pMemPageForLongArray);
pageMap.put(longArrayPage, pMemPageForLongArray);
} else {
freeAllPMemPages();
pageMap.clear();
return false;
}
Expand Down
Loading