From 046b522c2be6bcdd4b27828419c5ae82d21fc844 Mon Sep 17 00:00:00 2001 From: sherlockkenan Date: Thu, 10 Jun 2021 12:23:44 +0800 Subject: [PATCH] fix inconsistency problem in appendlog --- src/kvstore/raftex/RaftPart.cpp | 120 ++++++++++++++++++++----------- src/kvstore/wal/FileBasedWal.cpp | 9 +++ src/kvstore/wal/FileBasedWal.h | 3 + src/kvstore/wal/Wal.h | 4 ++ 4 files changed, 94 insertions(+), 42 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 4e59b024f21..44267d281fb 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1548,56 +1548,92 @@ void RaftPart::processAppendLogRequest( } // req.get_last_log_id_sent() >= committedLogId_ - if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) { - LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_ - << ", which is different from the leader's prevLogTerm " - << req.get_last_log_term_sent() - << ", the prevLogId is " << req.get_last_log_id_sent() - << ". So need to rollback to last committedLogId_ " << committedLogId_; - if (wal_->rollbackToLog(committedLogId_)) { + if (req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_) { + // nothing to do + // just append log later + } else if (req.get_last_log_id_sent() > lastLogId_) { + // There is a gap + LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_ + << ". Need to catch up"; + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; + } else { + // check the term + int reqLastLogTerm = wal_->getLogTerm(req.get_last_log_id_sent()); + if (req.get_last_log_term_sent() != reqLastLogTerm) { + LOG(INFO) << idStr_ << "The local log term is " << reqLastLogTerm + << ", which is different from the leader's prevLogTerm " + << req.get_last_log_term_sent() + << ", the prevLogId is " << req.get_last_log_id_sent() + << ". So ask leader to send logs from committedLogId " << committedLogId_; + TermID committedLogTerm = wal_->getLogTerm(committedLogId_); + if (committedLogTerm > 0 ) { + resp.set_last_log_id(committedLogId_); + resp.set_last_log_term(committedLogTerm); + } + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; + } + } + + // request get_last_log_term_sent == wal[get_last_log_id_sent].log_term + size_t numLogs = req.get_log_str_list().size(); + LogID firstId = req.get_last_log_id_sent() + 1; + + std::vector logEntries (req.get_log_str_list().begin(), + req.get_log_str_list().end()); + + // may be need to rollback wal_ + if (!( req.get_last_log_id_sent() == wal_->lastLogId() && req.get_last_log_term_sent() == wal_->lastLogTerm())) { + // check the diff index in log + size_t diffIndex = 0; + { + std::unique_ptr it = wal_->iterator(firstId,firstId + numLogs); + for (size_t i = 0; i < numLogs && it->valid(); i++, ++(*it), diffIndex ++ ) { + int logTerm = it->logTerm(); + if (req.get_log_term() != logTerm) { + break; + } + } + it.reset(); + } + + // stale log + if (diffIndex == numLogs) { + resp.set_last_log_id(firstId + numLogs - 1); + resp.set_last_log_term(req.get_log_term()); + resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); + //nothing to append + return; + } + + //rollback the wal_ + if (wal_->rollbackToLog(firstId + diffIndex -1)) { lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_ << ", logLogTerm is " << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", term is " << term_; - } - resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); - return; - } else if (req.get_last_log_id_sent() > lastLogId_) { - // There is a gap - LOG(INFO) << idStr_ << "Local is missing logs from id " - << lastLogId_ << ". Need to catch up"; - resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); - return; - } else if (req.get_last_log_id_sent() < lastLogId_) { - // TODO(doodle): This is a potential bug which would cause data not in consensus. In most - // case, we would hit this path when leader append logs to follower and timeout (leader - // would set lastLogIdSent_ = logIdToSend_ - 1 in Host). **But follower actually received - // it successfully**. Which will explain when leader retry to append these logs, the LOG - // belows is printed, and lastLogId_ == req.get_last_log_id_sent() + 1 in the LOG. - // - // In fact we should always rollback to req.get_last_log_id_sent(), and append the logs from - // leader (we can't make promise that the logs in range [req.get_last_log_id_sent() + 1, - // lastLogId_] is same with follower). However, this makes no difference in the above case. - LOG(INFO) << idStr_ << "Stale log! Local lastLogId " << lastLogId_ - << ", lastLogTerm " << lastLogTerm_ - << ", lastLogIdSent " << req.get_last_log_id_sent() - << ", lastLogTermSent " << req.get_last_log_term_sent(); - resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); - return; + } else { + LOG(ERROR) << idStr_ << "Rollback fail! lastLogId is" << lastLogId_ + << ", logLogTerm is " << lastLogTerm_ + << ", committedLogId is " << committedLogId_ + << ", rollback id is " << firstId + diffIndex -1; + resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + return; + } + + // update msg + logEntries = std::vector (req.get_log_str_list().begin() + diffIndex, + req.get_log_str_list().end()); + firstId = firstId + diffIndex; + numLogs = numLogs - diffIndex; } // Append new logs - size_t numLogs = req.get_log_str_list().size(); - LogID firstId = req.get_last_log_id_sent() + 1; - VLOG(2) << idStr_ << "Writing log [" << firstId - << ", " << firstId + numLogs - 1 << "] to WAL"; - LogStrListIterator iter(firstId, - req.get_log_term(), - req.get_log_str_list()); + LogStrListIterator iter(firstId, req.get_log_term(), logEntries); + if (wal_->appendLogs(iter)) { if (numLogs != 0) { CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId; @@ -1608,7 +1644,7 @@ void RaftPart::processAppendLogRequest( resp.set_last_log_term(lastLogTerm_); } else { LOG(ERROR) << idStr_ << "Failed to append logs to WAL"; - resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); return; } diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index 2bf9db1a827..1999e367f86 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -792,5 +792,14 @@ size_t FileBasedWal::accessAllBuffers(std::function fn) return count; } +TermID FileBasedWal::getLogTerm(LogID id) { + TermID term = -1; + auto walIter = std::make_unique(shared_from_this(), id, id); + if (walIter->valid()) { + term = walIter->logTerm(); + } + walIter.reset(); + return term; +} } // namespace wal } // namespace nebula diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index e21a17c0323..7bae1828331 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -79,6 +79,9 @@ class FileBasedWal final : public Wal TermID lastLogTerm() const override { return lastLogTerm_; } + // Return the term of special logId + // if not exist, return -1 + TermID getLogTerm(LogID id) override; // Append one log messages to the WAL // This method **IS NOT** thread-safe diff --git a/src/kvstore/wal/Wal.h b/src/kvstore/wal/Wal.h index 5a33a31a405..a20c72a042e 100644 --- a/src/kvstore/wal/Wal.h +++ b/src/kvstore/wal/Wal.h @@ -29,6 +29,10 @@ class Wal { // Return the term to receive the last log virtual TermID lastLogTerm() const = 0; + // Return the term of special logId + // if not exist, return -1 + virtual TermID getLogTerm(LogID id) = 0; + // Append one log message to the WAL virtual bool appendLog(LogID id, TermID term,