Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[PMEM-SHUFFLE-48] Fix the bug about mapstatus tracking and add more connections for metastore. #49

Merged
merged 1 commit into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[submodule "rpmp/include/hiredis"]
path = rpmp/include/hiredis
url = https://github.com/redis/hiredis.git
branch = master
[submodule "rpmp/include/jsoncpp"]
path = rpmp/include/jsoncpp
url = https://github.com/open-source-parsers/jsoncpp.git
branch = master
[submodule "rpmp/include/spdlog"]
path = rpmp/include/spdlog
url = https://github.com/gabime/spdlog.git
branch = v1.x
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[spark] class UnCompressedMapStatus(
val step = 8

def this(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long) = {
this(loc, uncompressedSizes.map(MapStatus.compressSize), mapTaskId)
this(loc, uncompressedSizes.flatMap(UnCompressedMapStatus.longToBytes), mapTaskId)
}

override def updateLocation(newLoc: BlockManagerId): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.storage.BlockManager
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import java.security.MessageDigest

private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffleBlockResolver,
metadataResolver: MetadataResolver,
blockManager: BlockManager,
Expand Down Expand Up @@ -128,22 +130,9 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl
}

val shuffleServerId = blockManager.shuffleServerId
/**
if (pmofConf.enableRdma) {
val rkey = PmemBlockOutputStreamArray(0).getRkey()
metadataResolver.pushPmemBlockInfo(stageId, mapId, pmemBlockInfoMap, rkey)
val blockManagerId: BlockManagerId =
BlockManagerId(shuffleServerId.executorId, PmofTransferService.shuffleNodesMap(shuffleServerId.host),
PmofTransferService.getTransferServiceInstance(pmofConf, blockManager).port, shuffleServerId.topologyInfo)
mapStatus = MapStatus(blockManagerId, partitionLengths, mapId)
} else {
mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId)
}
**/

if (pmofConf.enableRemotePmem) {
mapStatus = new UnCompressedMapStatus(shuffleServerId, partitionLengths, mapId)
//mapStatus = MapStatus(shuffleServerId, partitionLengths)
} else if (!pmofConf.enableRdma) {
mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId)
} else {
Expand All @@ -156,6 +145,20 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl
shuffleServerId.topologyInfo)
mapStatus = MapStatus(blockManagerId, partitionLengths, mapId)
}
/**
For debug usage
logInfo(
s" shuffle_${stageId}_${mapId}_0 size is ${partitionLengths(0)}, decompressed size is ${mapStatus
.getSizeForBlock(0)}")
**/
}


/**
* For debug usage
**/
def md5(s: String) = {
MessageDigest.getInstance("MD5").digest(s.getBytes)
}

/** Close this writer, passing along whether the map completed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ object NettyByteBufferPool extends Logging {
}
}

def allocateNewBuffer(): ByteBuf = synchronized {
allocator.directBuffer()
}

def allocateFlexibleNewBuffer(bufSize: Int): ByteBuf = synchronized {
val byteBuf = allocator.directBuffer(65536, bufSize * 2)
bufferMap += (byteBuf -> bufSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PmemOutputStream(
key_id += 1
flushed_block_id = cur_block_id
cur_block_id = s"${blockId}_${key_id}"
logDebug(s" [PUT Completed]${blockId}-${bufferRemainingSize}, ${NettyByteBufferPool.dump(byteBuffer, bufferRemainingSize)}")
} else {
val byteBuffer: ByteBuffer = buf.nioBuffer()
persistentMemoryWriter.setPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,17 @@ private[spark] final class RpmpShuffleBlockFetcherIterator(
private[this] var address: BlockManagerId = _
private[this] var blockInfos: Seq[(BlockId, Long, Int)] = _
private[this] var iterator: Iterator[(BlockId, Long, Int)] = _
private[this] var blocksByAddressSeq: Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] = _

initialize()

def initialize(): Unit = {
context.addTaskCompletionListener[Unit](_ => cleanup())
blocksByAddressSize = blocksByAddress.size
blocksByAddressSeq = blocksByAddress.toSeq
blocksByAddressSize = blocksByAddressSeq.size

if (blocksByAddressCurrentId < blocksByAddressSize) {
val res = blocksByAddress.toSeq(blocksByAddressCurrentId)
val res = blocksByAddressSeq(blocksByAddressCurrentId)
address = res._1
blockInfos = res._2
iterator = blockInfos.iterator
Expand Down Expand Up @@ -228,7 +231,7 @@ private[spark] final class RpmpShuffleBlockFetcherIterator(
has_next = false
return has_next
}
val res = blocksByAddress.toSeq(blocksByAddressCurrentId)
val res = blocksByAddressSeq(blocksByAddressCurrentId)
address = res._1
blockInfos = res._2
iterator = blockInfos.iterator
Expand Down
1 change: 1 addition & 0 deletions rpmp/include/hiredis
Submodule hiredis added at 2d9d77
1 change: 1 addition & 0 deletions rpmp/include/jsoncpp
Submodule jsoncpp added at c39fbd
1 change: 1 addition & 0 deletions rpmp/include/spdlog
Submodule spdlog added at 5df9b1
8 changes: 8 additions & 0 deletions rpmp/pmpool/proxy/PhysicalNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ class PhysicalNode {

string getPort() { return port; }

void setIp(string ip){
this->ip = ip;
}

void setPort(string port){
this->port = port;
}

private:
string ip;
string port;
Expand Down
53 changes: 53 additions & 0 deletions rpmp/pmpool/proxy/clientService/ClientService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,58 @@ void ClientService::addRecords(uint64_t key, unordered_set<PhysicalNode, Physica
metastore_->set(to_string(key), json_str);
}

std::unordered_set<PhysicalNode, PhysicalNodeHash> ClientService::getNodes(uint64_t key){
std::unordered_set<PhysicalNode, PhysicalNodeHash> nodes;

int retry = 10;
int timeout = 1;
while(retry > 0){
std::string key_str = to_string(key);
auto rawJson = metastore_->get(key_str);

const auto rawJsonLength = static_cast<int>(rawJson.length());
JSONCPP_STRING err;
Json::Value root;

Json::CharReaderBuilder builder;
const std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root,
&err)) {
#ifdef DEBUG
std::cout << "key: " << key <<endl;
std::cout << "rawJson: " << rawJson.c_str() <<endl;
std::cout << "ClientService::Error occurred in getNodes." << std::endl;
#endif
sleep(timeout);
retry--;
continue;
}else{
retry = -1;
}

Json::Value recordArray = root["data"];
Json::ArrayIndex length = recordArray.size();
Json::Value data;

for(Json::ArrayIndex i = 0; i < length; i++){
if(recordArray[i][STATUS] == VALID){
PhysicalNode* node = new PhysicalNode();
string ip = recordArray[i][NODE].toStyledString();
cout<<"ip: "<<ip<<endl;
node->setIp(ip);
node->setPort("12346");
nodes.insert(*node);
}
}
}

if(retry == 0){
std::cout << "ClientService::Error occurred in getNodes with multiples retrys for key: " << key << std::endl;
}

return nodes;
}

void ClientService::enqueue_recv_msg(std::shared_ptr<ProxyRequest> request) {
worker_->addTask(request);
// ProxyRequestContext rc = request->get_rc();
Expand Down Expand Up @@ -145,6 +197,7 @@ void ClientService::handle_recv_msg(std::shared_ptr<ProxyRequest> request) {
}
case GET_REPLICA: {
auto nodes = proxyServer_->getReplica(rc.key);
//auto nodes = getNodes(rc.key);
rrc.type = rc.type;
rrc.key = rc.key;
rrc.success = 0;
Expand Down
2 changes: 2 additions & 0 deletions rpmp/pmpool/proxy/clientService/ClientService.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class ClientService : public std::enable_shared_from_this<ClientService>{
* NODE:
* STATUS:
* SIZE:
* }
*
**/
const string JOB_STATUS = "JOB_STATUS";
Expand All @@ -116,6 +117,7 @@ class ClientService : public std::enable_shared_from_this<ClientService>{
// std::vector<std::shared_ptr<Worker>> workers_;
void constructJobStatus(Json::Value record, uint64_t key);
void addRecords(uint64_t key, unordered_set<PhysicalNode, PhysicalNodeHash> nodes);
std::unordered_set<PhysicalNode, PhysicalNodeHash> getNodes(uint64_t key);

std::shared_ptr<Worker> worker_;
std::shared_ptr<ChunkMgr> chunkMgr_;
Expand Down
13 changes: 11 additions & 2 deletions rpmp/pmpool/proxy/metastore/redis/Redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <unordered_set>
#include <sw/redis++/redis++.h>
#include <sw/redis++/connection.h>
#include <sw/redis++/connection_pool.h>

// TODO: RPMP proxy process should not be terminated at runtime when cannot connect to Redis for query, etc.
Redis::Redis(std::shared_ptr<Config> config, std::shared_ptr<RLog> log){
Expand All @@ -21,8 +23,15 @@ Redis::Redis(std::shared_ptr<Config> config, std::shared_ptr<RLog> log){
**/
bool Redis::connect() {
// Create an Redis object, which is movable but NOT copyable.
string connection_str = "tcp://" + address_ + ":" + port_;
redis_ = new sw::redis::Redis(connection_str);
sw::redis::ConnectionOptions connection_options;
connection_options.host = address_; // Required.
connection_options.port = stoi(port_); // Optional. The default port is 6379.

sw::redis::ConnectionPoolOptions pool_options;
pool_options.size = 5; // Pool size, i.e. max number of connections.

//string connection_str = "tcp://" + address_ + ":" + port_;
redis_ = new sw::redis::Redis(connection_options, pool_options);
return true;
}

Expand Down
80 changes: 47 additions & 33 deletions rpmp/pmpool/proxy/replicaService/ReplicaService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,43 +69,57 @@ void ReplicaService::enqueue_recv_msg(std::shared_ptr<ReplicaRequest> request) {
* Update data status once it's been put to the node successfully
**/
void ReplicaService::updateRecord(uint64_t key, PhysicalNode node, uint64_t size){
string rawJson = metastore_->get(to_string(key));
#ifdef DEBUG
cout<<rawJson<<endl;
#endif
const auto rawJsonLength = static_cast<int>(rawJson.length());
JSONCPP_STRING err;
Json::Value root;

Json::CharReaderBuilder builder;
const std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root,
&err)) {
#ifndef DEBUG
std::cout << "key: " << key <<endl;
std::cout << "rawJson: " << rawJson.c_str() <<endl;
int retry = 10;
int timeout = 1;
while(retry > 0){
std::string key_str = to_string(key);
auto rawJson = metastore_->get(key_str);
#ifdef DEBUG
cout<<rawJson<<endl;
#endif
std::cout << "ReplicaService::Error occurred in UpdateRecord." << std::endl;
}

Json::Value recordArray = root["data"];
Json::ArrayIndex length = recordArray.size();
Json::Value data;

for(Json::ArrayIndex i = 0; i < length; i++){
data[i][NODE] = recordArray[i][NODE];
if(data[i][NODE] == node.getIp()){
data[i][STATUS] = VALID;
data[i][SIZE] = to_string(size);
const auto rawJsonLength = static_cast<int>(rawJson.length());
JSONCPP_STRING err;
Json::Value root;

Json::CharReaderBuilder builder;
const std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root,
&err)) {
#ifdef DEBUG
std::cout << "key: " << key <<endl;
std::cout << "rawJson: " << rawJson.c_str() <<endl;
std::cout << "ReplicaService::Error occurred in UpdateRecord." << std::endl;
#endif
sleep(timeout);
retry--;
continue;
}else{
data[i][STATUS] = recordArray[i][STATUS];
data[i][SIZE] = to_string(size);
retry = -1;
}

Json::Value recordArray = root["data"];
Json::ArrayIndex length = recordArray.size();
Json::Value data;

for(Json::ArrayIndex i = 0; i < length; i++){
data[i][NODE] = recordArray[i][NODE];
if(data[i][NODE] == node.getIp()){
data[i][STATUS] = VALID;
data[i][SIZE] = to_string(size);
}else{
data[i][STATUS] = recordArray[i][STATUS];
data[i][SIZE] = to_string(size);
}
}
}

root["data"] = data;
string json_str = rootToString(root);
metastore_->set(to_string(key), json_str);
root["data"] = data;
string json_str = rootToString(root);
metastore_->set(to_string(key), json_str);
}
if(retry == 0){
std::cout << "key: " << key <<endl;
std::cout << "ReplicaService::Error occurred in UpdateRecord with multiples retrys." << std::endl;
}
}

ChunkMgr* ReplicaService::getChunkMgr(){
Expand Down