From 79f2519413b4fe2c66b9f338e402a3d38198fd3e Mon Sep 17 00:00:00 2001 From: Eugene-Mark Date: Mon, 23 Aug 2021 09:38:24 +0800 Subject: [PATCH] [PMEM-SHUFFLE-48] Fix the bug about mapstatus tracking and add more connections for Redis --- .gitmodules | 12 +++ .../pmof/UnCompressedMapStatus.scala | 2 +- .../shuffle/pmof/PmemShuffleWriter.scala | 29 ++++--- .../storage/pmof/NettyByteBufferPool.scala | 4 - .../spark/storage/pmof/PmemOutputStream.scala | 1 + .../RpmpShuffleBlockFetcherIterator.scala | 9 ++- rpmp/include/hiredis | 1 + rpmp/include/jsoncpp | 1 + rpmp/include/spdlog | 1 + rpmp/pmpool/proxy/PhysicalNode.h | 8 ++ .../proxy/clientService/ClientService.cc | 53 ++++++++++++ .../proxy/clientService/ClientService.h | 2 + rpmp/pmpool/proxy/metastore/redis/Redis.cc | 13 ++- .../proxy/replicaService/ReplicaService.cc | 80 +++++++++++-------- 14 files changed, 160 insertions(+), 56 deletions(-) create mode 100644 .gitmodules create mode 160000 rpmp/include/hiredis create mode 160000 rpmp/include/jsoncpp create mode 160000 rpmp/include/spdlog diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..0270b27e --- /dev/null +++ b/.gitmodules @@ -0,0 +1,12 @@ +[submodule "rpmp/include/hiredis"] + path = rpmp/include/hiredis + url = https://github.com/redis/hiredis.git + branch = master +[submodule "rpmp/include/jsoncpp"] + path = rpmp/include/jsoncpp + url = https://github.com/open-source-parsers/jsoncpp.git + branch = master +[submodule "rpmp/include/spdlog"] + path = rpmp/include/spdlog + url = https://github.com/gabime/spdlog.git + branch = v1.x diff --git a/core/src/main/scala/org/apache/spark/scheduler/pmof/UnCompressedMapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/pmof/UnCompressedMapStatus.scala index da368c0e..98391624 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/pmof/UnCompressedMapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/pmof/UnCompressedMapStatus.scala @@ -27,7 +27,7 @@ private[spark] class UnCompressedMapStatus( val step = 8 def this(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long) = { - this(loc, uncompressedSizes.map(MapStatus.compressSize), mapTaskId) + this(loc, uncompressedSizes.flatMap(UnCompressedMapStatus.longToBytes), mapTaskId) } override def updateLocation(newLoc: BlockManagerId): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala index 20727e34..daeb1324 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala @@ -33,6 +33,8 @@ import org.apache.spark.storage.BlockManager import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import java.security.MessageDigest + private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffleBlockResolver, metadataResolver: MetadataResolver, blockManager: BlockManager, @@ -128,22 +130,9 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl } val shuffleServerId = blockManager.shuffleServerId - /** - if (pmofConf.enableRdma) { - val rkey = PmemBlockOutputStreamArray(0).getRkey() - metadataResolver.pushPmemBlockInfo(stageId, mapId, pmemBlockInfoMap, rkey) - val blockManagerId: BlockManagerId = - BlockManagerId(shuffleServerId.executorId, PmofTransferService.shuffleNodesMap(shuffleServerId.host), - PmofTransferService.getTransferServiceInstance(pmofConf, blockManager).port, shuffleServerId.topologyInfo) - mapStatus = MapStatus(blockManagerId, partitionLengths, mapId) - } else { - mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId) - } - **/ if (pmofConf.enableRemotePmem) { mapStatus = new UnCompressedMapStatus(shuffleServerId, partitionLengths, mapId) - //mapStatus = MapStatus(shuffleServerId, partitionLengths) } else if (!pmofConf.enableRdma) { mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId) } else { @@ -156,6 +145,20 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl shuffleServerId.topologyInfo) mapStatus = MapStatus(blockManagerId, partitionLengths, mapId) } + /** + For debug usage + logInfo( + s" shuffle_${stageId}_${mapId}_0 size is ${partitionLengths(0)}, decompressed size is ${mapStatus + .getSizeForBlock(0)}") + **/ + } + + + /** + * For debug usage + **/ + def md5(s: String) = { + MessageDigest.getInstance("MD5").digest(s.getBytes) } /** Close this writer, passing along whether the map completed */ diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala b/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala index e7ebb84e..5ad49d44 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala @@ -40,10 +40,6 @@ object NettyByteBufferPool extends Logging { } } - def allocateNewBuffer(): ByteBuf = synchronized { - allocator.directBuffer() - } - def allocateFlexibleNewBuffer(bufSize: Int): ByteBuf = synchronized { val byteBuf = allocator.directBuffer(65536, bufSize * 2) bufferMap += (byteBuf -> bufSize) diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala index 00660144..e2a07b5b 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala @@ -59,6 +59,7 @@ class PmemOutputStream( key_id += 1 flushed_block_id = cur_block_id cur_block_id = s"${blockId}_${key_id}" + logDebug(s" [PUT Completed]${blockId}-${bufferRemainingSize}, ${NettyByteBufferPool.dump(byteBuffer, bufferRemainingSize)}") } else { val byteBuffer: ByteBuffer = buf.nioBuffer() persistentMemoryWriter.setPartition( diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/RpmpShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/pmof/RpmpShuffleBlockFetcherIterator.scala index c26f6d9f..dce2ac74 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/RpmpShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/RpmpShuffleBlockFetcherIterator.scala @@ -140,14 +140,17 @@ private[spark] final class RpmpShuffleBlockFetcherIterator( private[this] var address: BlockManagerId = _ private[this] var blockInfos: Seq[(BlockId, Long, Int)] = _ private[this] var iterator: Iterator[(BlockId, Long, Int)] = _ + private[this] var blocksByAddressSeq: Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] = _ initialize() def initialize(): Unit = { context.addTaskCompletionListener[Unit](_ => cleanup()) - blocksByAddressSize = blocksByAddress.size + blocksByAddressSeq = blocksByAddress.toSeq + blocksByAddressSize = blocksByAddressSeq.size + if (blocksByAddressCurrentId < blocksByAddressSize) { - val res = blocksByAddress.toSeq(blocksByAddressCurrentId) + val res = blocksByAddressSeq(blocksByAddressCurrentId) address = res._1 blockInfos = res._2 iterator = blockInfos.iterator @@ -228,7 +231,7 @@ private[spark] final class RpmpShuffleBlockFetcherIterator( has_next = false return has_next } - val res = blocksByAddress.toSeq(blocksByAddressCurrentId) + val res = blocksByAddressSeq(blocksByAddressCurrentId) address = res._1 blockInfos = res._2 iterator = blockInfos.iterator diff --git a/rpmp/include/hiredis b/rpmp/include/hiredis new file mode 160000 index 00000000..2d9d7751 --- /dev/null +++ b/rpmp/include/hiredis @@ -0,0 +1 @@ +Subproject commit 2d9d77518d012a54ae34f9822e4b4d19823c4b75 diff --git a/rpmp/include/jsoncpp b/rpmp/include/jsoncpp new file mode 160000 index 00000000..c39fbdac --- /dev/null +++ b/rpmp/include/jsoncpp @@ -0,0 +1 @@ +Subproject commit c39fbdac0f0f6638d5cfca43988750a1aac512db diff --git a/rpmp/include/spdlog b/rpmp/include/spdlog new file mode 160000 index 00000000..5df9b111 --- /dev/null +++ b/rpmp/include/spdlog @@ -0,0 +1 @@ +Subproject commit 5df9b11141f8a6f01b069f8c2d7120b25cca6a02 diff --git a/rpmp/pmpool/proxy/PhysicalNode.h b/rpmp/pmpool/proxy/PhysicalNode.h index f55c197f..1055cec5 100644 --- a/rpmp/pmpool/proxy/PhysicalNode.h +++ b/rpmp/pmpool/proxy/PhysicalNode.h @@ -25,6 +25,14 @@ class PhysicalNode { string getPort() { return port; } + void setIp(string ip){ + this->ip = ip; + } + + void setPort(string port){ + this->port = port; + } + private: string ip; string port; diff --git a/rpmp/pmpool/proxy/clientService/ClientService.cc b/rpmp/pmpool/proxy/clientService/ClientService.cc index 4eaac9ee..4a2dd310 100644 --- a/rpmp/pmpool/proxy/clientService/ClientService.cc +++ b/rpmp/pmpool/proxy/clientService/ClientService.cc @@ -105,6 +105,58 @@ void ClientService::addRecords(uint64_t key, unordered_setset(to_string(key), json_str); } +std::unordered_set ClientService::getNodes(uint64_t key){ + std::unordered_set nodes; + + int retry = 10; + int timeout = 1; + while(retry > 0){ + std::string key_str = to_string(key); + auto rawJson = metastore_->get(key_str); + + const auto rawJsonLength = static_cast(rawJson.length()); + JSONCPP_STRING err; + Json::Value root; + + Json::CharReaderBuilder builder; + const std::unique_ptr reader(builder.newCharReader()); + if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root, + &err)) { + #ifdef DEBUG + std::cout << "key: " << key <setIp(ip); + node->setPort("12346"); + nodes.insert(*node); + } + } + } + + if(retry == 0){ + std::cout << "ClientService::Error occurred in getNodes with multiples retrys for key: " << key << std::endl; + } + + return nodes; +} + void ClientService::enqueue_recv_msg(std::shared_ptr request) { worker_->addTask(request); // ProxyRequestContext rc = request->get_rc(); @@ -145,6 +197,7 @@ void ClientService::handle_recv_msg(std::shared_ptr request) { } case GET_REPLICA: { auto nodes = proxyServer_->getReplica(rc.key); + //auto nodes = getNodes(rc.key); rrc.type = rc.type; rrc.key = rc.key; rrc.success = 0; diff --git a/rpmp/pmpool/proxy/clientService/ClientService.h b/rpmp/pmpool/proxy/clientService/ClientService.h index c3fa14bf..a2bff4e6 100644 --- a/rpmp/pmpool/proxy/clientService/ClientService.h +++ b/rpmp/pmpool/proxy/clientService/ClientService.h @@ -101,6 +101,7 @@ class ClientService : public std::enable_shared_from_this{ * NODE: * STATUS: * SIZE: + * } * **/ const string JOB_STATUS = "JOB_STATUS"; @@ -116,6 +117,7 @@ class ClientService : public std::enable_shared_from_this{ // std::vector> workers_; void constructJobStatus(Json::Value record, uint64_t key); void addRecords(uint64_t key, unordered_set nodes); + std::unordered_set getNodes(uint64_t key); std::shared_ptr worker_; std::shared_ptr chunkMgr_; diff --git a/rpmp/pmpool/proxy/metastore/redis/Redis.cc b/rpmp/pmpool/proxy/metastore/redis/Redis.cc index 88e16092..7e4e3ba9 100644 --- a/rpmp/pmpool/proxy/metastore/redis/Redis.cc +++ b/rpmp/pmpool/proxy/metastore/redis/Redis.cc @@ -6,6 +6,8 @@ #include #include +#include +#include // TODO: RPMP proxy process should not be terminated at runtime when cannot connect to Redis for query, etc. Redis::Redis(std::shared_ptr config, std::shared_ptr log){ @@ -21,8 +23,15 @@ Redis::Redis(std::shared_ptr config, std::shared_ptr log){ **/ bool Redis::connect() { // Create an Redis object, which is movable but NOT copyable. - string connection_str = "tcp://" + address_ + ":" + port_; - redis_ = new sw::redis::Redis(connection_str); + sw::redis::ConnectionOptions connection_options; + connection_options.host = address_; // Required. + connection_options.port = stoi(port_); // Optional. The default port is 6379. + + sw::redis::ConnectionPoolOptions pool_options; + pool_options.size = 5; // Pool size, i.e. max number of connections. + + //string connection_str = "tcp://" + address_ + ":" + port_; + redis_ = new sw::redis::Redis(connection_options, pool_options); return true; } diff --git a/rpmp/pmpool/proxy/replicaService/ReplicaService.cc b/rpmp/pmpool/proxy/replicaService/ReplicaService.cc index 7c8875cf..1b220181 100644 --- a/rpmp/pmpool/proxy/replicaService/ReplicaService.cc +++ b/rpmp/pmpool/proxy/replicaService/ReplicaService.cc @@ -69,43 +69,57 @@ void ReplicaService::enqueue_recv_msg(std::shared_ptr request) { * Update data status once it's been put to the node successfully **/ void ReplicaService::updateRecord(uint64_t key, PhysicalNode node, uint64_t size){ - string rawJson = metastore_->get(to_string(key)); - #ifdef DEBUG - cout<(rawJson.length()); - JSONCPP_STRING err; - Json::Value root; - - Json::CharReaderBuilder builder; - const std::unique_ptr reader(builder.newCharReader()); - if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root, - &err)) { - #ifndef DEBUG - std::cout << "key: " << key < 0){ + std::string key_str = to_string(key); + auto rawJson = metastore_->get(key_str); + #ifdef DEBUG + cout<(rawJson.length()); + JSONCPP_STRING err; + Json::Value root; + + Json::CharReaderBuilder builder; + const std::unique_ptr reader(builder.newCharReader()); + if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root, + &err)) { + #ifdef DEBUG + std::cout << "key: " << key <set(to_string(key), json_str); + root["data"] = data; + string json_str = rootToString(root); + metastore_->set(to_string(key), json_str); + } + if(retry == 0){ + std::cout << "key: " << key <