Skip to content

Commit

Permalink
Prevent the heartbeat time from going back and causing leader lease i…
Browse files Browse the repository at this point in the history
…nvalid
  • Loading branch information
tangyuanzhang committed May 4, 2023
1 parent 6e81584 commit 77ebff2
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 21 deletions.
20 changes: 3 additions & 17 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ DEFINE_bool(auto_remove_invalid_space, true, "whether remove data of invalid spa
DECLARE_bool(rocksdb_disable_wal);
DECLARE_int32(rocksdb_backup_interval_secs);
DECLARE_int32(wal_ttl);
DEFINE_int32(raft_num_worker_threads, 32, "Raft part number of workers");

namespace nebula {
namespace kvstore {
Expand All @@ -51,13 +50,6 @@ bool NebulaStore::init() {
bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers");
storeWorker_ = std::make_shared<thread::GenericWorker>();
CHECK(storeWorker_->start());
{
auto pool = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager(
FLAGS_raft_num_worker_threads);
pool->setNamePrefix("part-executor");
pool->start();
partExecutor_ = std::move(pool);
}
snapshot_.reset(new NebulaSnapshotManager(this));
raftService_ = raftex::RaftexService::createService(ioPool_, workers_, raftAddr_.port);
if (raftService_ == nullptr) {
Expand Down Expand Up @@ -469,7 +461,7 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
engine,
ioPool_,
bgWorkers_,
partExecutor_,
workers_,
snapshot_,
clientMan_,
diskMan_,
Expand Down Expand Up @@ -628,14 +620,8 @@ std::shared_ptr<Listener> NebulaStore::newListener(GraphSpaceID spaceId,
folly::stringPrintf("%s/%d/%d/wal", options_.listenerPath_.c_str(), spaceId, partId);
std::shared_ptr<Listener> listener;
if (type == meta::cpp2::ListenerType::ELASTICSEARCH) {
listener = std::make_shared<ESListener>(spaceId,
partId,
raftAddr_,
walPath,
ioPool_,
bgWorkers_,
partExecutor_,
options_.schemaMan_);
listener = std::make_shared<ESListener>(
spaceId, partId, raftAddr_, walPath, ioPool_, bgWorkers_, workers_, options_.schemaMan_);
} else {
LOG(FATAL) << "Should not reach here";
return nullptr;
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,6 @@ class NebulaStore : public KVStore, public Handler {
std::shared_ptr<thread::GenericThreadPool> bgWorkers_;
HostAddr storeSvcAddr_;
std::shared_ptr<folly::Executor> workers_;
std::shared_ptr<folly::Executor> partExecutor_;
HostAddr raftAddr_;
KVOptions options_;

Expand Down
10 changes: 7 additions & 3 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,7 @@ bool RaftPart::handleElectionResponses(const ElectionResponses& resps,
bgWorkers_->addTask(
[self = shared_from_this(), proposedTerm] { self->onElected(proposedTerm); });
lastMsgAcceptedTime_ = 0;
lastMsgAcceptedCostMs_ = 0;
}
commitInThisTerm_ = false;
}
Expand Down Expand Up @@ -2115,9 +2116,12 @@ void RaftPart::sendHeartbeat() {
if (numSucceeded >= replica) {
VLOG(4) << idStr_ << "Heartbeat is accepted by quorum";
std::lock_guard<std::mutex> g(raftLock_);
auto now = time::WallClock::fastNowInMilliSec();
lastMsgAcceptedCostMs_ = now - startMs;
lastMsgAcceptedTime_ = now;
auto nowTime = static_cast<uint64_t>(time::WallClock::fastNowInMilliSec());
auto nowCostMs = nowTime - startMs;
if (nowTime - nowCostMs >= lastMsgAcceptedTime_ - lastMsgAcceptedCostMs_) {
lastMsgAcceptedCostMs_ = nowCostMs;
lastMsgAcceptedTime_ = nowTime;
}
}
});
}
Expand Down

0 comments on commit 77ebff2

Please sign in to comment.