From 5690ea88bc836ee629b8ba0a985b6f20d3ef782b Mon Sep 17 00:00:00 2001 From: zhangshixiang Date: Mon, 27 Feb 2023 20:05:22 +0800 Subject: [PATCH 1/5] RaftPart creates a separate execution thread pool to prevent blocking rpc and raft call --- src/kvstore/NebulaStore.cpp | 8 ++++++++ src/kvstore/NebulaStore.h | 3 +-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 7d87695a1c4..d607946eb9b 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -31,6 +31,7 @@ DEFINE_bool(auto_remove_invalid_space, true, "whether remove data of invalid spa DECLARE_bool(rocksdb_disable_wal); DECLARE_int32(rocksdb_backup_interval_secs); DECLARE_int32(wal_ttl); +DEFINE_int32(raft_num_worker_threads, 32, "Raft part number of workers"); namespace nebula { namespace kvstore { @@ -54,6 +55,13 @@ bool NebulaStore::init() { bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers"); storeWorker_ = std::make_shared(); CHECK(storeWorker_->start()); + { + auto pool = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( + FLAGS_raft_num_worker_threads); + pool->setNamePrefix("part-executor"); + pool->start(); + workers_ = std::move(pool); + } snapshot_.reset(new NebulaSnapshotManager(this)); raftService_ = raftex::RaftexService::createService(ioPool_, workers_, raftAddr_.port); if (raftService_ == nullptr) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 81026187d15..3459e47165c 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -69,10 +69,9 @@ class NebulaStore : public KVStore, public Handler { NebulaStore(KVOptions options, std::shared_ptr ioPool, HostAddr serviceAddr, - std::shared_ptr workers) + std::shared_ptr) : ioPool_(ioPool), storeSvcAddr_(serviceAddr), - workers_(workers), raftAddr_(getRaftAddr(serviceAddr)), options_(std::move(options)) { CHECK_NOTNULL(options_.partMan_); From 96658cc4cb26601a13de3290403348b3cc197776 Mon Sep 17 00:00:00 2001 From: zhangshixiang Date: Mon, 6 Mar 2023 15:52:22 +0800 Subject: [PATCH 2/5] raft part use separate thread pool --- src/kvstore/NebulaStore.cpp | 14 ++++++++++---- src/kvstore/NebulaStore.h | 4 +++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index d607946eb9b..c8e360fd734 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -60,7 +60,7 @@ bool NebulaStore::init() { FLAGS_raft_num_worker_threads); pool->setNamePrefix("part-executor"); pool->start(); - workers_ = std::move(pool); + partExecutor_ = std::move(pool); } snapshot_.reset(new NebulaSnapshotManager(this)); raftService_ = raftex::RaftexService::createService(ioPool_, workers_, raftAddr_.port); @@ -473,7 +473,7 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, engine, ioPool_, bgWorkers_, - workers_, + partExecutor_, snapshot_, clientMan_, diskMan_, @@ -632,8 +632,14 @@ std::shared_ptr NebulaStore::newListener(GraphSpaceID spaceId, folly::stringPrintf("%s/%d/%d/wal", options_.listenerPath_.c_str(), spaceId, partId); std::shared_ptr listener; if (type == meta::cpp2::ListenerType::ELASTICSEARCH) { - listener = std::make_shared( - spaceId, partId, raftAddr_, walPath, ioPool_, bgWorkers_, workers_, options_.schemaMan_); + listener = std::make_shared(spaceId, + partId, + raftAddr_, + walPath, + ioPool_, + bgWorkers_, + partExecutor_, + options_.schemaMan_); } else { LOG(FATAL) << "Should not reach here"; return nullptr; diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 3459e47165c..e6af5849ffa 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -69,9 +69,10 @@ class NebulaStore : public KVStore, public Handler { NebulaStore(KVOptions options, std::shared_ptr ioPool, HostAddr serviceAddr, - std::shared_ptr) + std::shared_ptr workers) : ioPool_(ioPool), storeSvcAddr_(serviceAddr), + workers_(workers), raftAddr_(getRaftAddr(serviceAddr)), options_(std::move(options)) { CHECK_NOTNULL(options_.partMan_); @@ -863,6 +864,7 @@ class NebulaStore : public KVStore, public Handler { std::shared_ptr bgWorkers_; HostAddr storeSvcAddr_; std::shared_ptr workers_; + std::shared_ptr partExecutor_; HostAddr raftAddr_; KVOptions options_; From 77ebff230c186ddd892a6273493456a896ebca5e Mon Sep 17 00:00:00 2001 From: shixiangz Date: Thu, 4 May 2023 17:52:01 +0800 Subject: [PATCH 3/5] Prevent the heartbeat time from going back and causing leader lease invalid --- src/kvstore/NebulaStore.cpp | 20 +++----------------- src/kvstore/NebulaStore.h | 1 - src/kvstore/raftex/RaftPart.cpp | 10 +++++++--- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index c7e161967cb..58e6ad31be6 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -27,7 +27,6 @@ DEFINE_bool(auto_remove_invalid_space, true, "whether remove data of invalid spa DECLARE_bool(rocksdb_disable_wal); DECLARE_int32(rocksdb_backup_interval_secs); DECLARE_int32(wal_ttl); -DEFINE_int32(raft_num_worker_threads, 32, "Raft part number of workers"); namespace nebula { namespace kvstore { @@ -51,13 +50,6 @@ bool NebulaStore::init() { bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers"); storeWorker_ = std::make_shared(); CHECK(storeWorker_->start()); - { - auto pool = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( - FLAGS_raft_num_worker_threads); - pool->setNamePrefix("part-executor"); - pool->start(); - partExecutor_ = std::move(pool); - } snapshot_.reset(new NebulaSnapshotManager(this)); raftService_ = raftex::RaftexService::createService(ioPool_, workers_, raftAddr_.port); if (raftService_ == nullptr) { @@ -469,7 +461,7 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, engine, ioPool_, bgWorkers_, - partExecutor_, + workers_, snapshot_, clientMan_, diskMan_, @@ -628,14 +620,8 @@ std::shared_ptr NebulaStore::newListener(GraphSpaceID spaceId, folly::stringPrintf("%s/%d/%d/wal", options_.listenerPath_.c_str(), spaceId, partId); std::shared_ptr listener; if (type == meta::cpp2::ListenerType::ELASTICSEARCH) { - listener = std::make_shared(spaceId, - partId, - raftAddr_, - walPath, - ioPool_, - bgWorkers_, - partExecutor_, - options_.schemaMan_); + listener = std::make_shared( + spaceId, partId, raftAddr_, walPath, ioPool_, bgWorkers_, workers_, options_.schemaMan_); } else { LOG(FATAL) << "Should not reach here"; return nullptr; diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index e6af5849ffa..81026187d15 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -864,7 +864,6 @@ class NebulaStore : public KVStore, public Handler { std::shared_ptr bgWorkers_; HostAddr storeSvcAddr_; std::shared_ptr workers_; - std::shared_ptr partExecutor_; HostAddr raftAddr_; KVOptions options_; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 67dc533f922..a48505ffae1 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1383,6 +1383,7 @@ bool RaftPart::handleElectionResponses(const ElectionResponses& resps, bgWorkers_->addTask( [self = shared_from_this(), proposedTerm] { self->onElected(proposedTerm); }); lastMsgAcceptedTime_ = 0; + lastMsgAcceptedCostMs_ = 0; } commitInThisTerm_ = false; } @@ -2115,9 +2116,12 @@ void RaftPart::sendHeartbeat() { if (numSucceeded >= replica) { VLOG(4) << idStr_ << "Heartbeat is accepted by quorum"; std::lock_guard g(raftLock_); - auto now = time::WallClock::fastNowInMilliSec(); - lastMsgAcceptedCostMs_ = now - startMs; - lastMsgAcceptedTime_ = now; + auto nowTime = static_cast(time::WallClock::fastNowInMilliSec()); + auto nowCostMs = nowTime - startMs; + if (nowTime - nowCostMs >= lastMsgAcceptedTime_ - lastMsgAcceptedCostMs_) { + lastMsgAcceptedCostMs_ = nowCostMs; + lastMsgAcceptedTime_ = nowTime; + } } }); } From 949e6633bda205584519af7e8186f5639412d35e Mon Sep 17 00:00:00 2001 From: shixiangz Date: Tue, 15 Aug 2023 15:47:34 +0800 Subject: [PATCH 4/5] fix some of Optimize the write performance when host is down --- src/kvstore/raftex/Host.cpp | 5 +++-- src/kvstore/raftex/Host.h | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 22bbad341c4..53cb3c4553f 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -426,11 +426,12 @@ folly::Future Host::sendHeartbeat( if (t.hasException()) { using TransportException = apache::thrift::transport::TTransportException; auto exWrapper = std::move(t).exception(); + VLOG(2) << self->idStr_ << "Heartbeat: " << exWrapper.what(); auto exception = exWrapper.get_exception(); - VLOG(2) << self->idStr_ << "Heartbeat: " << exception->what(); // If we keeps receiving NOT_OPEN exception after some HB intervals, // we can assume that the peer is down so we mark paused_ as true - if (exception && exception->getType() == TransportException::NOT_OPEN) { + if (exception && (exception->getType() == TransportException::NOT_OPEN || + exception->getType() == TransportException::TIMED_OUT)) { if (!self->paused_) { auto now = time::WallClock::fastNowInMilliSec(); if (now - self->lastHeartbeatTime_ >= diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index 49bbf3d9ee8..ca95a1ab1ce 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -92,6 +92,7 @@ class Host final : public std::enable_shared_from_this { committedLogId_ = 0; sendingSnapshot_ = false; followerCommittedLogId_ = 0; + lastHeartbeatTime_ = 0; } /** From 860c0b0a474bcb7712ac83cc260ff6c97093a15f Mon Sep 17 00:00:00 2001 From: shixiangz Date: Mon, 18 Sep 2023 17:32:09 +0800 Subject: [PATCH 5/5] update comment --- src/kvstore/raftex/Host.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 53cb3c4553f..3a662c30436 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -428,7 +428,7 @@ folly::Future Host::sendHeartbeat( auto exWrapper = std::move(t).exception(); VLOG(2) << self->idStr_ << "Heartbeat: " << exWrapper.what(); auto exception = exWrapper.get_exception(); - // If we keeps receiving NOT_OPEN exception after some HB intervals, + // If we keeps receiving NOT_OPEN and TIMED_OUT exception after some HB intervals, // we can assume that the peer is down so we mark paused_ as true if (exception && (exception->getType() == TransportException::NOT_OPEN || exception->getType() == TransportException::TIMED_OUT)) {