From 6b73ba1a636618de01cad4f19d7f9ed7d90a415f Mon Sep 17 00:00:00 2001 From: dangleptr <37216992+dangleptr@users.noreply.github.com> Date: Tue, 16 Jul 2019 11:59:11 +0800 Subject: [PATCH] Remove the raftlock_ when appending logs into raft queue (#604) * Remove the raftlock_ when appending logs into raft queue * Fix race condition for term --- src/kvstore/raftex/RaftPart.cpp | 170 ++++++++++++++++---------------- src/kvstore/raftex/RaftPart.h | 16 +-- 2 files changed, 95 insertions(+), 91 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index e43038ec7d3..d617e185e94 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -39,9 +39,11 @@ using nebula::wal::BufferFlusher; class AppendLogsIterator final : public LogIterator { public: AppendLogsIterator(LogID firstLogId, + TermID termId, RaftPart::LogCache logs, std::function casCB) : firstLogId_(firstLogId) + , termId_(termId) , logId_(firstLogId) , logs_(std::move(logs)) , casCB_(std::move(casCB)) { @@ -75,7 +77,7 @@ class AppendLogsIterator final : public LogIterator { bool processCAS() { while (idx_ < logs_.size()) { auto& tup = logs_.at(idx_); - auto logType = std::get<2>(tup); + auto logType = std::get<1>(tup); if (logType != LogType::CAS) { // Not a CAS return false; @@ -83,7 +85,7 @@ class AppendLogsIterator final : public LogIterator { // Process CAS log CHECK(!!casCB_); - casResult_ = casCB_(std::get<3>(tup)); + casResult_ = casCB_(std::get<2>(tup)); if (casResult_.size() > 0) { // CAS Succeeded return true; @@ -126,8 +128,7 @@ class AppendLogsIterator final : public LogIterator { } TermID logTerm() const override { - DCHECK(valid()); - return std::get<1>(logs_.at(idx_)); + return termId_; } ClusterID logSource() const override { @@ -140,7 +141,7 @@ class AppendLogsIterator final : public LogIterator { if (currLogType_ == LogType::CAS) { return casResult_; } else { - return std::get<3>(logs_.at(idx_)); + return std::get<2>(logs_.at(idx_)); } } @@ -163,7 +164,7 @@ class AppendLogsIterator final : public LogIterator { } LogType logType() const { - return std::get<2>(logs_.at(idx_)); + return std::get<1>(logs_.at(idx_)); } private: @@ -175,6 +176,7 @@ class AppendLogsIterator final : public LogIterator { LogType currLogType_{LogType::NORMAL}; std::string casResult_; LogID firstLogId_; + TermID termId_; LogID logId_; RaftPart::LogCache logs_; std::function casCB_; @@ -299,9 +301,8 @@ void RaftPart::stop() { } -AppendLogResult RaftPart::canAppendLogs( - std::lock_guard& lck) { - UNUSED(lck); +AppendLogResult RaftPart::canAppendLogs() { + CHECK(!raftLock_.try_lock()); if (status_ == Status::STARTING) { LOG(ERROR) << idStr_ << "The partition is still starting"; return AppendLogResult::E_NOT_READY; @@ -340,21 +341,10 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, LogType logType, std::string log) { LogCache swappedOutLogs; - LogID firstId; auto retFuture = folly::Future::makeEmpty(); { - std::lock_guard lck(raftLock_); - - auto res = canAppendLogs(lck); - if (res != AppendLogResult::SUCCEEDED) { - LOG(ERROR) << idStr_ - << "Cannot append logs, clean the buffer"; - cachingPromise_.setValue(std::move(res)); - cachingPromise_.reset(); - logs_.clear(); - return res; - } + std::lock_guard lck(logsLock_); VLOG(2) << idStr_ << "Checking whether buffer overflow"; @@ -370,7 +360,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, // Append new logs to the buffer DCHECK_GE(source, 0); - logs_.emplace_back(source, term_, logType, std::move(log)); + logs_.emplace_back(source, logType, std::move(log)); switch (logType) { case LogType::CAS: retFuture = cachingPromise_.getSingleFuture(); @@ -382,22 +372,37 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, retFuture = cachingPromise_.getSharedFuture(); break; } - if (replicatingLogs_) { - VLOG(2) << idStr_ - << "Another AppendLogs request is ongoing," - " just return"; - return retFuture; - } else { + + bool expected = false; + if (replicatingLogs_.compare_exchange_strong(expected, true)) { // We need to send logs to all followers VLOG(2) << idStr_ << "Preparing to send AppendLog request"; - replicatingLogs_ = true; sendingPromise_ = std::move(cachingPromise_); cachingPromise_.reset(); std::swap(swappedOutLogs, logs_); - firstId = lastLogId_ + 1; + } else { + VLOG(2) << idStr_ + << "Another AppendLogs request is ongoing," + " just return"; + return retFuture; } } + LogID firstId = 0; + TermID termId = 0; + { + std::lock_guard g(raftLock_); + auto res = canAppendLogs(); + if (res != AppendLogResult::SUCCEEDED) { + LOG(ERROR) << idStr_ + << "Cannot append logs, clean the buffer"; + sendingPromise_.setValue(std::move(res)); + replicatingLogs_ = false; + return res; + } + firstId = lastLogId_ + 1; + termId = term_; + } // Replicate buffered logs to all followers // Replication will happen on a separate thread and will block // until majority accept the logs, the leadership changes, or @@ -405,6 +410,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, VLOG(2) << idStr_ << "Calling appendLogsInternal()"; AppendLogsIterator it( firstId, + termId, std::move(swappedOutLogs), [this] (const std::string& msg) -> std::string { auto res = compareAndSet(msg); @@ -414,13 +420,13 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, } return res; }); - appendLogsInternal(std::move(it)); + appendLogsInternal(std::move(it), termId); return retFuture; } -void RaftPart::appendLogsInternal(AppendLogsIterator iter) { +void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { TermID currTerm = 0; LogID prevLogId = 0; TermID prevLogTerm = 0; @@ -440,8 +446,6 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter) { // The partition is not running VLOG(2) << idStr_ << "The partition is stopped"; sendingPromise_.setValue(AppendLogResult::E_STOPPED); - cachingPromise_.setValue(AppendLogResult::E_STOPPED); - logs_.clear(); replicatingLogs_ = false; return; } @@ -450,8 +454,12 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter) { // Is not a leader any more VLOG(2) << idStr_ << "The leader has changed"; sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - cachingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - logs_.clear(); + replicatingLogs_ = false; + return; + } + if (term_ != termId) { + VLOG(2) << idStr_ << "Term has been updated, origin " << termId << ", new " << term_; + sendingPromise_.setValue(AppendLogResult::E_TERM_OUT_OF_DATE); replicatingLogs_ = false; return; } @@ -501,8 +509,6 @@ void RaftPart::replicateLogs(folly::EventBase* eb, // The partition is not running VLOG(2) << idStr_ << "The partition is stopped"; sendingPromise_.setValue(AppendLogResult::E_STOPPED); - cachingPromise_.setValue(AppendLogResult::E_STOPPED); - logs_.clear(); replicatingLogs_ = false; return; } @@ -511,8 +517,6 @@ void RaftPart::replicateLogs(folly::EventBase* eb, // Is not a leader any more VLOG(2) << idStr_ << "The leader has changed"; sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - cachingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - logs_.clear(); replicatingLogs_ = false; return; } @@ -615,14 +619,13 @@ void RaftPart::processAppendLogResponses( VLOG(2) << idStr_ << numSucceeded << " hosts have accepted the logs"; + LogID firstLogId = 0; { std::lock_guard g(raftLock_); if (status_ != Status::RUNNING) { // The partition is not running VLOG(2) << idStr_ << "The partition is stopped"; sendingPromise_.setValue(AppendLogResult::E_STOPPED); - cachingPromise_.setValue(AppendLogResult::E_STOPPED); - logs_.clear(); replicatingLogs_ = false; return; } @@ -631,8 +634,6 @@ void RaftPart::processAppendLogResponses( // Is not a leader any more VLOG(2) << idStr_ << "The leader has changed"; sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - cachingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER); - logs_.clear(); replicatingLogs_ = false; return; } @@ -651,51 +652,51 @@ void RaftPart::processAppendLogResponses( // Step 3: Commit the batch if (commitLogs(std::move(walIt))) { committedLogId_ = lastLogId; - - // Step 4: Fulfill the promise - if (iter.hasNonCASLogs()) { - sendingPromise_.setOneSharedValue(AppendLogResult::SUCCEEDED); - } - if (iter.leadByCAS()) { - sendingPromise_.setOneSingleValue(AppendLogResult::SUCCEEDED); - } - VLOG(2) << idStr_ << "Succeeded in committing the logs"; - - // Step 5: Check whether need to continue - // the log replication - CHECK(replicatingLogs_); - // Continue to process the original AppendLogsIterator if necessary - iter.resume(); - if (iter.empty()) { - if (logs_.size() > 0) { - // continue to replicate the logs - sendingPromise_ = std::move(cachingPromise_); - cachingPromise_.reset(); - iter = AppendLogsIterator( - lastLogId_ + 1, - std::move(logs_), - [this] (const std::string& log) -> std::string { - auto res = compareAndSet(log); - if (res.empty()) { - // Failed - sendingPromise_.setOneSingleValue( - AppendLogResult::E_CAS_FAILURE); - } - return res; - }); - logs_.clear(); - } else { - replicatingLogs_ = false; - VLOG(2) << idStr_ << "No more log to be replicated"; - } - } + firstLogId = lastLogId_ + 1; } else { LOG(FATAL) << idStr_ << "Failed to commit logs"; } + VLOG(2) << idStr_ << "Succeeded in committing the logs"; + } + // Step 4: Fulfill the promise + if (iter.hasNonCASLogs()) { + sendingPromise_.setOneSharedValue(AppendLogResult::SUCCEEDED); + } + if (iter.leadByCAS()) { + sendingPromise_.setOneSingleValue(AppendLogResult::SUCCEEDED); + } + // Step 5: Check whether need to continue + // the log replication + CHECK(replicatingLogs_); + // Continue to process the original AppendLogsIterator if necessary + iter.resume(); + if (iter.empty()) { + std::lock_guard lck(logsLock_); + if (logs_.size() > 0) { + // continue to replicate the logs + sendingPromise_ = std::move(cachingPromise_); + cachingPromise_.reset(); + iter = AppendLogsIterator( + firstLogId, + currTerm, + std::move(logs_), + [this] (const std::string& log) -> std::string { + auto res = compareAndSet(log); + if (res.empty()) { + // Failed + sendingPromise_.setOneSingleValue( + AppendLogResult::E_CAS_FAILURE); + } + return res; + }); + logs_.clear(); + } else { + replicatingLogs_ = false; + VLOG(2) << idStr_ << "No more log to be replicated"; + } } - if (!iter.empty()) { - appendLogsInternal(std::move(iter)); + appendLogsInternal(std::move(iter), currTerm); } } else { // Not enough hosts accepted the log, re-try @@ -898,6 +899,7 @@ bool RaftPart::leaderElection() { } LOG(FATAL) << "Should not reach here"; + return false; } diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 312c472c57f..c0fcbf6e697 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -227,10 +227,9 @@ class RaftPart : public std::enable_shared_from_this { // resp -- AppendLogResponse using AppendLogResponses = std::vector; - // + // using LogCache = std::vector< std::tuple>; @@ -278,13 +277,13 @@ class RaftPart : public std::enable_shared_from_this { // Check whether new logs can be appended // Pre-condition: The caller needs to hold the raftLock_ - AppendLogResult canAppendLogs(std::lock_guard& lck); + AppendLogResult canAppendLogs(); folly::Future appendLogAsync(ClusterID source, LogType logType, std::string log); - void appendLogsInternal(AppendLogsIterator iter); + void appendLogsInternal(AppendLogsIterator iter, TermID termId); void replicateLogs( folly::EventBase* eb, @@ -394,13 +393,16 @@ class RaftPart : public std::enable_shared_from_this { peerHosts_; size_t quorum_{0}; + // The lock is used to protect logs_ and cachingPromise_ + mutable std::mutex logsLock_; + std::atomic_bool replicatingLogs_{false}; + PromiseSet cachingPromise_; + LogCache logs_; + // Partition level lock to synchronize the access of the partition mutable std::mutex raftLock_; - bool replicatingLogs_{false}; - PromiseSet cachingPromise_; PromiseSet sendingPromise_; - LogCache logs_; Status status_; Role role_;