Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[PMEM-SHUFFLE-7] enable fsdax mode in pmem-shuffle #6

Merged
merged 3 commits into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.storage.pmof

import java.io.File
import java.nio.ByteBuffer

import org.apache.spark.internal.Logging
Expand All @@ -25,6 +26,8 @@ private[spark] class PersistentMemoryHandler(
// need to use a locked file to get which pmem device should be used.
val pmMetaHandler: PersistentMemoryMetaHandler = new PersistentMemoryMetaHandler(root_dir)
var device: String = pmMetaHandler.getShuffleDevice(shuffleId)
var poolFile = ""
var isFsdaxFile = false
if(device == "") {
//this shuffleId haven't been written before, choose a new device
val path_array_list = new java.util.ArrayList[String](path_list.asJava)
Expand All @@ -33,15 +36,17 @@ private[spark] class PersistentMemoryHandler(
val dev = Paths.get(device)
if (Files.isDirectory(dev)) {
// this is fsdax, add a subfile
device += "/shuffle_block_" + UUID.randomUUID().toString()
logInfo("This is a fsdax, filename:" + device)
isFsdaxFile = true
poolFile = device + "/shuffle_block_" + UUID.randomUUID().toString()
logInfo("This is a fsdax, filename:" + poolFile)
} else {
logInfo("This is a devdax, name:" + device)
poolFile = device
logInfo("This is a devdax, name:" + poolFile)
poolSize = 0
}
}

val pmpool = new PersistentMemoryPool(device, poolSize)
val pmpool = new PersistentMemoryPool(poolFile, poolSize)
var rkey: Long = 0


Expand Down Expand Up @@ -84,8 +89,20 @@ private[spark] class PersistentMemoryHandler(
}

def close(): Unit = synchronized {
pmpool.close()
pmMetaHandler.remove()
if (isFsdaxFile) {
try {
if (new File(poolFile).delete()) {
logInfo("File deleted successfully: " + poolFile)
} else {
logWarning("Failed to delete file: " + poolFile)
}
} catch {
case e: Exception => e.printStackTrace()
}
} else {
pmpool.close()
pmMetaHandler.remove()
}
}

def getRootAddr(): Long = {
Expand Down
8 changes: 4 additions & 4 deletions native/src/pmemkv.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ key_3 --> block_meta_list_3[block_meta, block_meta, block_meta]
*/
class pmemkv {
public:
explicit pmemkv(const char* dev_path_) : pmem_pool(nullptr), dev_path(dev_path_), bp(nullptr) {
if (create()) {
explicit pmemkv(const char* dev_path_, long size) : pmem_pool(nullptr), dev_path(dev_path_), bp(nullptr) {
if (create(size)) {
int res = open();
if (res) {
std::cout << "failed to open pmem pool, errmsg: " << pmemobj_errormsg() << std::endl;
Expand Down Expand Up @@ -448,12 +448,12 @@ class pmemkv {
return (uint64_t)pmem_pool;
}
private:
int create() {
int create(long size) {
// debug setting
int sds_write_value = 0;
pmemobj_ctl_set(nullptr, "sds.at_create", &sds_write_value);

pmem_pool = pmemobj_create(dev_path, PMEMKV_LAYOUT_NAME, 0, 0666);
pmem_pool = pmemobj_create(dev_path, PMEMKV_LAYOUT_NAME, size, 0666);
if (pmem_pool == nullptr) {
return -1;
}
Expand Down