diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 36ba695bb60..faaa73f6a99 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -484,6 +484,12 @@ void RaftPart::stop() { VLOG(1) << idStr_ << "Partition has been stopped"; } +std::pair RaftPart::getTermAndRole() const { + std::lock_guard g(raftLock_); + std::pair res = {term_, role_}; + return res; +} + void RaftPart::cleanWal() { std::lock_guard g(raftLock_); wal()->cleanWAL(committedLogId_); @@ -1462,12 +1468,6 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, return; } - if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { - VLOG(3) << idStr_ << "The partition is still waiting snapshot"; - resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT; - return; - } - VLOG(1) << idStr_ << "The partition currently is a " << roleStr(role_) << ", lastLogId " << lastLogId_ << ", lastLogTerm " << lastLogTerm_ << ", committedLogId " << committedLogId_ << ", term " << term_; @@ -1952,18 +1952,19 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_NOT_READY; return; } - // Check leadership - nebula::cpp2::ErrorCode err = verifyLeader(req); - // Set term_ again because it may be modified in verifyLeader - resp.current_term_ref() = term_; - if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { - // Wrong leadership - VLOG(3) << idStr_ << "Will not follow the leader"; - resp.error_code_ref() = err; - return; - } + resp.current_term_ref() = req.get_current_term(); if (status_ != Status::WAITING_SNAPSHOT) { VLOG(2) << idStr_ << "Begin to receive the snapshot"; + // Check leadership + nebula::cpp2::ErrorCode err = verifyLeader(req); + // Set term_ again because it may be modified in verifyLeader + resp.current_term_ref() = term_; + if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { + // Wrong leadership + VLOG(3) << idStr_ << "Will not follow the leader"; + resp.error_code_ref() = err; + return; + } reset(); status_ = Status::WAITING_SNAPSHOT; lastSnapshotCommitId_ = req.get_committed_log_id(); @@ -1978,6 +1979,11 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT; return; } + if (term_ > req.get_current_term()) { + VLOG(2) << idStr_ << "leader changed, new term " << term_ + << " but continue to receive snapshot from old leader " << req.get_leader_addr() + << " of term " << req.get_current_term(); + } lastSnapshotRecvDur_.reset(); // TODO(heng): Maybe we should save them into one sst firstly? auto ret = commitSnapshot( @@ -2034,6 +2040,10 @@ void RaftPart::sendHeartbeat() { decltype(hosts_) hosts; { std::lock_guard g(raftLock_); + nebula::cpp2::ErrorCode rc = canAppendLogs(); + if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + return; + } currTerm = term_; commitLogId = committedLogId_; prevLogTerm = lastLogTerm_; diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index a495cdf1c20..b2bd3c5a921 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -436,6 +436,12 @@ class RaftPart : public std::enable_shared_from_this { using Status = cpp2::Status; using Role = cpp2::Role; + /** + * @brief return term and role + * @return + */ + std::pair getTermAndRole() const; + /** * @brief The str of the RaftPart, used in logging */ diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp index 0a38eb93e7b..ad2ebfbff44 100644 --- a/src/kvstore/raftex/SnapshotManager.cpp +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -36,7 +36,13 @@ folly::Future>> SnapshotManager::sendSnapshot( executor_->add([this, p = std::move(p), part, dst]() mutable { auto spaceId = part->spaceId_; auto partId = part->partId_; - auto termId = part->term_; + auto tr = part->getTermAndRole(); + if (tr.second != RaftPart::Role::LEADER) { + VLOG(1) << part->idStr_ << "leader changed, term " << tr.first << ", do not send snapshot to " + << dst; + return; + } + auto termId = tr.first; const auto& localhost = part->address(); accessAllRowsInSnapshot( spaceId,