From 046b522c2be6bcdd4b27828419c5ae82d21fc844 Mon Sep 17 00:00:00 2001 From: sherlockkenan Date: Thu, 10 Jun 2021 12:23:44 +0800 Subject: [PATCH 1/3] fix inconsistency problem in appendlog --- src/kvstore/raftex/RaftPart.cpp | 120 ++++++++++++++++++++----------- src/kvstore/wal/FileBasedWal.cpp | 9 +++ src/kvstore/wal/FileBasedWal.h | 3 + src/kvstore/wal/Wal.h | 4 ++ 4 files changed, 94 insertions(+), 42 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 4e59b024f21..44267d281fb 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1548,56 +1548,92 @@ void RaftPart::processAppendLogRequest( } // req.get_last_log_id_sent() >= committedLogId_ - if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) { - LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_ - << ", which is different from the leader's prevLogTerm " - << req.get_last_log_term_sent() - << ", the prevLogId is " << req.get_last_log_id_sent() - << ". So need to rollback to last committedLogId_ " << committedLogId_; - if (wal_->rollbackToLog(committedLogId_)) { + if (req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_) { + // nothing to do + // just append log later + } else if (req.get_last_log_id_sent() > lastLogId_) { + // There is a gap + LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_ + << ". Need to catch up"; + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; + } else { + // check the term + int reqLastLogTerm = wal_->getLogTerm(req.get_last_log_id_sent()); + if (req.get_last_log_term_sent() != reqLastLogTerm) { + LOG(INFO) << idStr_ << "The local log term is " << reqLastLogTerm + << ", which is different from the leader's prevLogTerm " + << req.get_last_log_term_sent() + << ", the prevLogId is " << req.get_last_log_id_sent() + << ". So ask leader to send logs from committedLogId " << committedLogId_; + TermID committedLogTerm = wal_->getLogTerm(committedLogId_); + if (committedLogTerm > 0 ) { + resp.set_last_log_id(committedLogId_); + resp.set_last_log_term(committedLogTerm); + } + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; + } + } + + // request get_last_log_term_sent == wal[get_last_log_id_sent].log_term + size_t numLogs = req.get_log_str_list().size(); + LogID firstId = req.get_last_log_id_sent() + 1; + + std::vector logEntries (req.get_log_str_list().begin(), + req.get_log_str_list().end()); + + // may be need to rollback wal_ + if (!( req.get_last_log_id_sent() == wal_->lastLogId() && req.get_last_log_term_sent() == wal_->lastLogTerm())) { + // check the diff index in log + size_t diffIndex = 0; + { + std::unique_ptr it = wal_->iterator(firstId,firstId + numLogs); + for (size_t i = 0; i < numLogs && it->valid(); i++, ++(*it), diffIndex ++ ) { + int logTerm = it->logTerm(); + if (req.get_log_term() != logTerm) { + break; + } + } + it.reset(); + } + + // stale log + if (diffIndex == numLogs) { + resp.set_last_log_id(firstId + numLogs - 1); + resp.set_last_log_term(req.get_log_term()); + resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); + //nothing to append + return; + } + + //rollback the wal_ + if (wal_->rollbackToLog(firstId + diffIndex -1)) { lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_ << ", logLogTerm is " << lastLogTerm_ << ", committedLogId is " << committedLogId_ << ", term is " << term_; - } - resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); - return; - } else if (req.get_last_log_id_sent() > lastLogId_) { - // There is a gap - LOG(INFO) << idStr_ << "Local is missing logs from id " - << lastLogId_ << ". Need to catch up"; - resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); - return; - } else if (req.get_last_log_id_sent() < lastLogId_) { - // TODO(doodle): This is a potential bug which would cause data not in consensus. In most - // case, we would hit this path when leader append logs to follower and timeout (leader - // would set lastLogIdSent_ = logIdToSend_ - 1 in Host). **But follower actually received - // it successfully**. Which will explain when leader retry to append these logs, the LOG - // belows is printed, and lastLogId_ == req.get_last_log_id_sent() + 1 in the LOG. - // - // In fact we should always rollback to req.get_last_log_id_sent(), and append the logs from - // leader (we can't make promise that the logs in range [req.get_last_log_id_sent() + 1, - // lastLogId_] is same with follower). However, this makes no difference in the above case. - LOG(INFO) << idStr_ << "Stale log! Local lastLogId " << lastLogId_ - << ", lastLogTerm " << lastLogTerm_ - << ", lastLogIdSent " << req.get_last_log_id_sent() - << ", lastLogTermSent " << req.get_last_log_term_sent(); - resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); - return; + } else { + LOG(ERROR) << idStr_ << "Rollback fail! lastLogId is" << lastLogId_ + << ", logLogTerm is " << lastLogTerm_ + << ", committedLogId is " << committedLogId_ + << ", rollback id is " << firstId + diffIndex -1; + resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + return; + } + + // update msg + logEntries = std::vector (req.get_log_str_list().begin() + diffIndex, + req.get_log_str_list().end()); + firstId = firstId + diffIndex; + numLogs = numLogs - diffIndex; } // Append new logs - size_t numLogs = req.get_log_str_list().size(); - LogID firstId = req.get_last_log_id_sent() + 1; - VLOG(2) << idStr_ << "Writing log [" << firstId - << ", " << firstId + numLogs - 1 << "] to WAL"; - LogStrListIterator iter(firstId, - req.get_log_term(), - req.get_log_str_list()); + LogStrListIterator iter(firstId, req.get_log_term(), logEntries); + if (wal_->appendLogs(iter)) { if (numLogs != 0) { CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId; @@ -1608,7 +1644,7 @@ void RaftPart::processAppendLogRequest( resp.set_last_log_term(lastLogTerm_); } else { LOG(ERROR) << idStr_ << "Failed to append logs to WAL"; - resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); return; } diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index 2bf9db1a827..1999e367f86 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -792,5 +792,14 @@ size_t FileBasedWal::accessAllBuffers(std::function fn) return count; } +TermID FileBasedWal::getLogTerm(LogID id) { + TermID term = -1; + auto walIter = std::make_unique(shared_from_this(), id, id); + if (walIter->valid()) { + term = walIter->logTerm(); + } + walIter.reset(); + return term; +} } // namespace wal } // namespace nebula diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index e21a17c0323..7bae1828331 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -79,6 +79,9 @@ class FileBasedWal final : public Wal TermID lastLogTerm() const override { return lastLogTerm_; } + // Return the term of special logId + // if not exist, return -1 + TermID getLogTerm(LogID id) override; // Append one log messages to the WAL // This method **IS NOT** thread-safe diff --git a/src/kvstore/wal/Wal.h b/src/kvstore/wal/Wal.h index 5a33a31a405..a20c72a042e 100644 --- a/src/kvstore/wal/Wal.h +++ b/src/kvstore/wal/Wal.h @@ -29,6 +29,10 @@ class Wal { // Return the term to receive the last log virtual TermID lastLogTerm() const = 0; + // Return the term of special logId + // if not exist, return -1 + virtual TermID getLogTerm(LogID id) = 0; + // Append one log message to the WAL virtual bool appendLog(LogID id, TermID term, From 17290e1353b40669018c04e70240ca673516d8bb Mon Sep 17 00:00:00 2001 From: sherlockkenan Date: Fri, 2 Jul 2021 11:14:44 +0800 Subject: [PATCH 2/3] check memory buffer first and add test case --- src/kvstore/raftex/RaftPart.cpp | 2 +- src/kvstore/wal/FileBasedWal.cpp | 8 +++++ src/kvstore/wal/test/FileBasedWalTest.cpp | 38 +++++++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 44267d281fb..df56a43c0c9 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1584,7 +1584,7 @@ void RaftPart::processAppendLogRequest( req.get_log_str_list().end()); // may be need to rollback wal_ - if (!( req.get_last_log_id_sent() == wal_->lastLogId() && req.get_last_log_term_sent() == wal_->lastLogTerm())) { + if (!(req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_)) { // check the diff index in log size_t diffIndex = 0; { diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index 1999e367f86..2946555dfbc 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -793,6 +793,14 @@ size_t FileBasedWal::accessAllBuffers(std::function fn) } TermID FileBasedWal::getLogTerm(LogID id) { + // check the memory log buffer + for (auto it = buffers_.rbegin(); it != buffers_.rend(); ++it) { + auto buffer = *it; + if (id >= buffer->firstLogId() && id <= buffer->lastLogId()) { + return buffer->getTerm(id - buffer->firstLogId()); + } + } + // check the log file TermID term = -1; auto walIter = std::make_unique(shared_from_this(), id, id); if (walIter->valid()) { diff --git a/src/kvstore/wal/test/FileBasedWalTest.cpp b/src/kvstore/wal/test/FileBasedWalTest.cpp index 8a97dfa6449..40c3292d2ca 100644 --- a/src/kvstore/wal/test/FileBasedWalTest.cpp +++ b/src/kvstore/wal/test/FileBasedWalTest.cpp @@ -578,6 +578,44 @@ TEST(FileBasedWal, LinkTest) { EXPECT_EQ(num + 1, wal->walFiles_.size()); } +TEST(FileBasedWal, getLogTermTest) { + FileBasedWalPolicy policy; + policy.fileSize = 1024L * 1024L; + policy.bufferSize = 1024L * 1024L; + policy.numBuffers = 2; + + TempDir walDir("/tmp/testWal.XXXXXX"); + auto wal = FileBasedWal::getWal(walDir.path(), + "", + policy, + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); + + // Append > 10MB logs in total + for (int i = 1; i <= 10000; i++) { + ASSERT_TRUE(wal->appendLog(i /*id*/, i /*term*/, 0 /*cluster*/, + folly::stringPrintf(kLongMsg, i))); + } + + // in the memory buffer + ASSERT_EQ(10000, wal->getLogTerm(10000)); + // in the file + ASSERT_EQ(4, wal->getLogTerm(4)); + + // Close the wal + wal.reset(); + + // Now let's open it to read + wal = FileBasedWal::getWal(walDir.path(), + "", + policy, + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); + EXPECT_EQ(10, wal->getLogTerm(10)); +} + } // namespace wal } // namespace nebula From 06a816b8b07730fda6fc2be973df9db2feacdfe3 Mon Sep 17 00:00:00 2001 From: sherlockkenan Date: Mon, 5 Jul 2021 18:31:00 +0800 Subject: [PATCH 3/3] refine logEntries implement --- src/kvstore/raftex/RaftPart.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index df56a43c0c9..8944f363196 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1580,13 +1580,11 @@ void RaftPart::processAppendLogRequest( size_t numLogs = req.get_log_str_list().size(); LogID firstId = req.get_last_log_id_sent() + 1; - std::vector logEntries (req.get_log_str_list().begin(), - req.get_log_str_list().end()); - + size_t diffIndex = 0; // may be need to rollback wal_ if (!(req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_)) { // check the diff index in log - size_t diffIndex = 0; + { std::unique_ptr it = wal_->iterator(firstId,firstId + numLogs); for (size_t i = 0; i < numLogs && it->valid(); i++, ++(*it), diffIndex ++ ) { @@ -1625,13 +1623,15 @@ void RaftPart::processAppendLogRequest( } // update msg - logEntries = std::vector (req.get_log_str_list().begin() + diffIndex, - req.get_log_str_list().end()); firstId = firstId + diffIndex; numLogs = numLogs - diffIndex; } // Append new logs + std::vector logEntries = std::vector ( + std::make_move_iterator(req.get_log_str_list().begin() + diffIndex), + std::make_move_iterator(req.get_log_str_list().end())); + LogStrListIterator iter(firstId, req.get_log_term(), logEntries); if (wal_->appendLogs(iter)) {