Skip to content

Commit

Permalink
fix split brain in raft (#4479)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
liwenhui-soul and Sophie-Xie authored Aug 22, 2022
1 parent 403aaa4 commit ff8daf1
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
42 changes: 26 additions & 16 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,12 @@ void RaftPart::stop() {
VLOG(1) << idStr_ << "Partition has been stopped";
}

std::pair<TermID, RaftPart::Role> RaftPart::getTermAndRole() const {
std::lock_guard<std::mutex> g(raftLock_);
std::pair<TermID, RaftPart::Role> res = {term_, role_};
return res;
}

void RaftPart::cleanWal() {
std::lock_guard<std::mutex> g(raftLock_);
wal()->cleanWAL(committedLogId_);
Expand Down Expand Up @@ -1462,12 +1468,6 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req,
return;
}

if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) {
VLOG(3) << idStr_ << "The partition is still waiting snapshot";
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT;
return;
}

VLOG(1) << idStr_ << "The partition currently is a " << roleStr(role_) << ", lastLogId "
<< lastLogId_ << ", lastLogTerm " << lastLogTerm_ << ", committedLogId "
<< committedLogId_ << ", term " << term_;
Expand Down Expand Up @@ -1952,18 +1952,19 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req,
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_NOT_READY;
return;
}
// Check leadership
nebula::cpp2::ErrorCode err = verifyLeader<cpp2::SendSnapshotRequest>(req);
// Set term_ again because it may be modified in verifyLeader
resp.current_term_ref() = term_;
if (err != nebula::cpp2::ErrorCode::SUCCEEDED) {
// Wrong leadership
VLOG(3) << idStr_ << "Will not follow the leader";
resp.error_code_ref() = err;
return;
}
resp.current_term_ref() = req.get_current_term();
if (status_ != Status::WAITING_SNAPSHOT) {
VLOG(2) << idStr_ << "Begin to receive the snapshot";
// Check leadership
nebula::cpp2::ErrorCode err = verifyLeader<cpp2::SendSnapshotRequest>(req);
// Set term_ again because it may be modified in verifyLeader
resp.current_term_ref() = term_;
if (err != nebula::cpp2::ErrorCode::SUCCEEDED) {
// Wrong leadership
VLOG(3) << idStr_ << "Will not follow the leader";
resp.error_code_ref() = err;
return;
}
reset();
status_ = Status::WAITING_SNAPSHOT;
lastSnapshotCommitId_ = req.get_committed_log_id();
Expand All @@ -1978,6 +1979,11 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req,
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT;
return;
}
if (term_ > req.get_current_term()) {
VLOG(2) << idStr_ << "leader changed, new term " << term_
<< " but continue to receive snapshot from old leader " << req.get_leader_addr()
<< " of term " << req.get_current_term();
}
lastSnapshotRecvDur_.reset();
// TODO(heng): Maybe we should save them into one sst firstly?
auto ret = commitSnapshot(
Expand Down Expand Up @@ -2034,6 +2040,10 @@ void RaftPart::sendHeartbeat() {
decltype(hosts_) hosts;
{
std::lock_guard<std::mutex> g(raftLock_);
nebula::cpp2::ErrorCode rc = canAppendLogs();
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
return;
}
currTerm = term_;
commitLogId = committedLogId_;
prevLogTerm = lastLogTerm_;
Expand Down
6 changes: 6 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
using Status = cpp2::Status;
using Role = cpp2::Role;

/**
* @brief return term and role
* @return
*/
std::pair<TermID, Role> getTermAndRole() const;

/**
* @brief The str of the RaftPart, used in logging
*/
Expand Down
8 changes: 7 additions & 1 deletion src/kvstore/raftex/SnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ folly::Future<StatusOr<std::pair<LogID, TermID>>> SnapshotManager::sendSnapshot(
executor_->add([this, p = std::move(p), part, dst]() mutable {
auto spaceId = part->spaceId_;
auto partId = part->partId_;
auto termId = part->term_;
auto tr = part->getTermAndRole();
if (tr.second != RaftPart::Role::LEADER) {
VLOG(1) << part->idStr_ << "leader changed, term " << tr.first << ", do not send snapshot to "
<< dst;
return;
}
auto termId = tr.first;
const auto& localhost = part->address();
accessAllRowsInSnapshot(
spaceId,
Expand Down

0 comments on commit ff8daf1

Please sign in to comment.