Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[PMEM-SHUFFLE-48] Fix the bug about mapstatus tracking and add more c…
Browse files Browse the repository at this point in the history
…onnections for Redis
  • Loading branch information
Eugene-Mark committed Aug 23, 2021
1 parent 7bc311c commit 79f2519
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 56 deletions.
12 changes: 12 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[submodule "rpmp/include/hiredis"]
path = rpmp/include/hiredis
url = https://github.com/redis/hiredis.git
branch = master
[submodule "rpmp/include/jsoncpp"]
path = rpmp/include/jsoncpp
url = https://github.com/open-source-parsers/jsoncpp.git
branch = master
[submodule "rpmp/include/spdlog"]
path = rpmp/include/spdlog
url = https://github.com/gabime/spdlog.git
branch = v1.x
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[spark] class UnCompressedMapStatus(
val step = 8

def this(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long) = {
this(loc, uncompressedSizes.map(MapStatus.compressSize), mapTaskId)
this(loc, uncompressedSizes.flatMap(UnCompressedMapStatus.longToBytes), mapTaskId)
}

override def updateLocation(newLoc: BlockManagerId): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.storage.BlockManager
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import java.security.MessageDigest

private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffleBlockResolver,
metadataResolver: MetadataResolver,
blockManager: BlockManager,
Expand Down Expand Up @@ -128,22 +130,9 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl
}

val shuffleServerId = blockManager.shuffleServerId
/**
if (pmofConf.enableRdma) {
val rkey = PmemBlockOutputStreamArray(0).getRkey()
metadataResolver.pushPmemBlockInfo(stageId, mapId, pmemBlockInfoMap, rkey)
val blockManagerId: BlockManagerId =
BlockManagerId(shuffleServerId.executorId, PmofTransferService.shuffleNodesMap(shuffleServerId.host),
PmofTransferService.getTransferServiceInstance(pmofConf, blockManager).port, shuffleServerId.topologyInfo)
mapStatus = MapStatus(blockManagerId, partitionLengths, mapId)
} else {
mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId)
}
**/

if (pmofConf.enableRemotePmem) {
mapStatus = new UnCompressedMapStatus(shuffleServerId, partitionLengths, mapId)
//mapStatus = MapStatus(shuffleServerId, partitionLengths)
} else if (!pmofConf.enableRdma) {
mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId)
} else {
Expand All @@ -156,6 +145,20 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl
shuffleServerId.topologyInfo)
mapStatus = MapStatus(blockManagerId, partitionLengths, mapId)
}
/**
For debug usage
logInfo(
s" shuffle_${stageId}_${mapId}_0 size is ${partitionLengths(0)}, decompressed size is ${mapStatus
.getSizeForBlock(0)}")
**/
}


/**
* For debug usage
**/
def md5(s: String) = {
MessageDigest.getInstance("MD5").digest(s.getBytes)
}

/** Close this writer, passing along whether the map completed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ object NettyByteBufferPool extends Logging {
}
}

def allocateNewBuffer(): ByteBuf = synchronized {
allocator.directBuffer()
}

def allocateFlexibleNewBuffer(bufSize: Int): ByteBuf = synchronized {
val byteBuf = allocator.directBuffer(65536, bufSize * 2)
bufferMap += (byteBuf -> bufSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PmemOutputStream(
key_id += 1
flushed_block_id = cur_block_id
cur_block_id = s"${blockId}_${key_id}"
logDebug(s" [PUT Completed]${blockId}-${bufferRemainingSize}, ${NettyByteBufferPool.dump(byteBuffer, bufferRemainingSize)}")
} else {
val byteBuffer: ByteBuffer = buf.nioBuffer()
persistentMemoryWriter.setPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,17 @@ private[spark] final class RpmpShuffleBlockFetcherIterator(
private[this] var address: BlockManagerId = _
private[this] var blockInfos: Seq[(BlockId, Long, Int)] = _
private[this] var iterator: Iterator[(BlockId, Long, Int)] = _
private[this] var blocksByAddressSeq: Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] = _

initialize()

def initialize(): Unit = {
context.addTaskCompletionListener[Unit](_ => cleanup())
blocksByAddressSize = blocksByAddress.size
blocksByAddressSeq = blocksByAddress.toSeq
blocksByAddressSize = blocksByAddressSeq.size

if (blocksByAddressCurrentId < blocksByAddressSize) {
val res = blocksByAddress.toSeq(blocksByAddressCurrentId)
val res = blocksByAddressSeq(blocksByAddressCurrentId)
address = res._1
blockInfos = res._2
iterator = blockInfos.iterator
Expand Down Expand Up @@ -228,7 +231,7 @@ private[spark] final class RpmpShuffleBlockFetcherIterator(
has_next = false
return has_next
}
val res = blocksByAddress.toSeq(blocksByAddressCurrentId)
val res = blocksByAddressSeq(blocksByAddressCurrentId)
address = res._1
blockInfos = res._2
iterator = blockInfos.iterator
Expand Down
1 change: 1 addition & 0 deletions rpmp/include/hiredis
Submodule hiredis added at 2d9d77
1 change: 1 addition & 0 deletions rpmp/include/jsoncpp
Submodule jsoncpp added at c39fbd
1 change: 1 addition & 0 deletions rpmp/include/spdlog
Submodule spdlog added at 5df9b1
8 changes: 8 additions & 0 deletions rpmp/pmpool/proxy/PhysicalNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ class PhysicalNode {

string getPort() { return port; }

void setIp(string ip){
this->ip = ip;
}

void setPort(string port){
this->port = port;
}

private:
string ip;
string port;
Expand Down
53 changes: 53 additions & 0 deletions rpmp/pmpool/proxy/clientService/ClientService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,58 @@ void ClientService::addRecords(uint64_t key, unordered_set<PhysicalNode, Physica
metastore_->set(to_string(key), json_str);
}

std::unordered_set<PhysicalNode, PhysicalNodeHash> ClientService::getNodes(uint64_t key){
std::unordered_set<PhysicalNode, PhysicalNodeHash> nodes;

int retry = 10;
int timeout = 1;
while(retry > 0){
std::string key_str = to_string(key);
auto rawJson = metastore_->get(key_str);

const auto rawJsonLength = static_cast<int>(rawJson.length());
JSONCPP_STRING err;
Json::Value root;

Json::CharReaderBuilder builder;
const std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root,
&err)) {
#ifdef DEBUG
std::cout << "key: " << key <<endl;
std::cout << "rawJson: " << rawJson.c_str() <<endl;
std::cout << "ClientService::Error occurred in getNodes." << std::endl;
#endif
sleep(timeout);
retry--;
continue;
}else{
retry = -1;
}

Json::Value recordArray = root["data"];
Json::ArrayIndex length = recordArray.size();
Json::Value data;

for(Json::ArrayIndex i = 0; i < length; i++){
if(recordArray[i][STATUS] == VALID){
PhysicalNode* node = new PhysicalNode();
string ip = recordArray[i][NODE].toStyledString();
cout<<"ip: "<<ip<<endl;
node->setIp(ip);
node->setPort("12346");
nodes.insert(*node);
}
}
}

if(retry == 0){
std::cout << "ClientService::Error occurred in getNodes with multiples retrys for key: " << key << std::endl;
}

return nodes;
}

void ClientService::enqueue_recv_msg(std::shared_ptr<ProxyRequest> request) {
worker_->addTask(request);
// ProxyRequestContext rc = request->get_rc();
Expand Down Expand Up @@ -145,6 +197,7 @@ void ClientService::handle_recv_msg(std::shared_ptr<ProxyRequest> request) {
}
case GET_REPLICA: {
auto nodes = proxyServer_->getReplica(rc.key);
//auto nodes = getNodes(rc.key);
rrc.type = rc.type;
rrc.key = rc.key;
rrc.success = 0;
Expand Down
2 changes: 2 additions & 0 deletions rpmp/pmpool/proxy/clientService/ClientService.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class ClientService : public std::enable_shared_from_this<ClientService>{
* NODE:
* STATUS:
* SIZE:
* }
*
**/
const string JOB_STATUS = "JOB_STATUS";
Expand All @@ -116,6 +117,7 @@ class ClientService : public std::enable_shared_from_this<ClientService>{
// std::vector<std::shared_ptr<Worker>> workers_;
void constructJobStatus(Json::Value record, uint64_t key);
void addRecords(uint64_t key, unordered_set<PhysicalNode, PhysicalNodeHash> nodes);
std::unordered_set<PhysicalNode, PhysicalNodeHash> getNodes(uint64_t key);

std::shared_ptr<Worker> worker_;
std::shared_ptr<ChunkMgr> chunkMgr_;
Expand Down
13 changes: 11 additions & 2 deletions rpmp/pmpool/proxy/metastore/redis/Redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <unordered_set>
#include <sw/redis++/redis++.h>
#include <sw/redis++/connection.h>
#include <sw/redis++/connection_pool.h>

// TODO: RPMP proxy process should not be terminated at runtime when cannot connect to Redis for query, etc.
Redis::Redis(std::shared_ptr<Config> config, std::shared_ptr<RLog> log){
Expand All @@ -21,8 +23,15 @@ Redis::Redis(std::shared_ptr<Config> config, std::shared_ptr<RLog> log){
**/
bool Redis::connect() {
// Create an Redis object, which is movable but NOT copyable.
string connection_str = "tcp://" + address_ + ":" + port_;
redis_ = new sw::redis::Redis(connection_str);
sw::redis::ConnectionOptions connection_options;
connection_options.host = address_; // Required.
connection_options.port = stoi(port_); // Optional. The default port is 6379.

sw::redis::ConnectionPoolOptions pool_options;
pool_options.size = 5; // Pool size, i.e. max number of connections.

//string connection_str = "tcp://" + address_ + ":" + port_;
redis_ = new sw::redis::Redis(connection_options, pool_options);
return true;
}

Expand Down
80 changes: 47 additions & 33 deletions rpmp/pmpool/proxy/replicaService/ReplicaService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,43 +69,57 @@ void ReplicaService::enqueue_recv_msg(std::shared_ptr<ReplicaRequest> request) {
* Update data status once it's been put to the node successfully
**/
void ReplicaService::updateRecord(uint64_t key, PhysicalNode node, uint64_t size){
string rawJson = metastore_->get(to_string(key));
#ifdef DEBUG
cout<<rawJson<<endl;
#endif
const auto rawJsonLength = static_cast<int>(rawJson.length());
JSONCPP_STRING err;
Json::Value root;

Json::CharReaderBuilder builder;
const std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root,
&err)) {
#ifndef DEBUG
std::cout << "key: " << key <<endl;
std::cout << "rawJson: " << rawJson.c_str() <<endl;
int retry = 10;
int timeout = 1;
while(retry > 0){
std::string key_str = to_string(key);
auto rawJson = metastore_->get(key_str);
#ifdef DEBUG
cout<<rawJson<<endl;
#endif
std::cout << "ReplicaService::Error occurred in UpdateRecord." << std::endl;
}

Json::Value recordArray = root["data"];
Json::ArrayIndex length = recordArray.size();
Json::Value data;

for(Json::ArrayIndex i = 0; i < length; i++){
data[i][NODE] = recordArray[i][NODE];
if(data[i][NODE] == node.getIp()){
data[i][STATUS] = VALID;
data[i][SIZE] = to_string(size);
const auto rawJsonLength = static_cast<int>(rawJson.length());
JSONCPP_STRING err;
Json::Value root;

Json::CharReaderBuilder builder;
const std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root,
&err)) {
#ifdef DEBUG
std::cout << "key: " << key <<endl;
std::cout << "rawJson: " << rawJson.c_str() <<endl;
std::cout << "ReplicaService::Error occurred in UpdateRecord." << std::endl;
#endif
sleep(timeout);
retry--;
continue;
}else{
data[i][STATUS] = recordArray[i][STATUS];
data[i][SIZE] = to_string(size);
retry = -1;
}

Json::Value recordArray = root["data"];
Json::ArrayIndex length = recordArray.size();
Json::Value data;

for(Json::ArrayIndex i = 0; i < length; i++){
data[i][NODE] = recordArray[i][NODE];
if(data[i][NODE] == node.getIp()){
data[i][STATUS] = VALID;
data[i][SIZE] = to_string(size);
}else{
data[i][STATUS] = recordArray[i][STATUS];
data[i][SIZE] = to_string(size);
}
}
}

root["data"] = data;
string json_str = rootToString(root);
metastore_->set(to_string(key), json_str);
root["data"] = data;
string json_str = rootToString(root);
metastore_->set(to_string(key), json_str);
}
if(retry == 0){
std::cout << "key: " << key <<endl;
std::cout << "ReplicaService::Error occurred in UpdateRecord with multiples retrys." << std::endl;
}
}

ChunkMgr* ReplicaService::getChunkMgr(){
Expand Down

0 comments on commit 79f2519

Please sign in to comment.