Skip to content

Commit

Permalink
Remove the raftlock_ when appending logs into raft queue (#604)
Browse files Browse the repository at this point in the history
* Remove the raftlock_ when appending logs into raft queue

* Fix race condition for term
  • Loading branch information
dangleptr authored Jul 16, 2019
1 parent 2a6373e commit 6b73ba1
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 91 deletions.
170 changes: 86 additions & 84 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ using nebula::wal::BufferFlusher;
class AppendLogsIterator final : public LogIterator {
public:
AppendLogsIterator(LogID firstLogId,
TermID termId,
RaftPart::LogCache logs,
std::function<std::string(const std::string&)> casCB)
: firstLogId_(firstLogId)
, termId_(termId)
, logId_(firstLogId)
, logs_(std::move(logs))
, casCB_(std::move(casCB)) {
Expand Down Expand Up @@ -75,15 +77,15 @@ class AppendLogsIterator final : public LogIterator {
bool processCAS() {
while (idx_ < logs_.size()) {
auto& tup = logs_.at(idx_);
auto logType = std::get<2>(tup);
auto logType = std::get<1>(tup);
if (logType != LogType::CAS) {
// Not a CAS
return false;
}

// Process CAS log
CHECK(!!casCB_);
casResult_ = casCB_(std::get<3>(tup));
casResult_ = casCB_(std::get<2>(tup));
if (casResult_.size() > 0) {
// CAS Succeeded
return true;
Expand Down Expand Up @@ -126,8 +128,7 @@ class AppendLogsIterator final : public LogIterator {
}

TermID logTerm() const override {
DCHECK(valid());
return std::get<1>(logs_.at(idx_));
return termId_;
}

ClusterID logSource() const override {
Expand All @@ -140,7 +141,7 @@ class AppendLogsIterator final : public LogIterator {
if (currLogType_ == LogType::CAS) {
return casResult_;
} else {
return std::get<3>(logs_.at(idx_));
return std::get<2>(logs_.at(idx_));
}
}

Expand All @@ -163,7 +164,7 @@ class AppendLogsIterator final : public LogIterator {
}

LogType logType() const {
return std::get<2>(logs_.at(idx_));
return std::get<1>(logs_.at(idx_));
}

private:
Expand All @@ -175,6 +176,7 @@ class AppendLogsIterator final : public LogIterator {
LogType currLogType_{LogType::NORMAL};
std::string casResult_;
LogID firstLogId_;
TermID termId_;
LogID logId_;
RaftPart::LogCache logs_;
std::function<std::string(const std::string&)> casCB_;
Expand Down Expand Up @@ -299,9 +301,8 @@ void RaftPart::stop() {
}


AppendLogResult RaftPart::canAppendLogs(
std::lock_guard<std::mutex>& lck) {
UNUSED(lck);
AppendLogResult RaftPart::canAppendLogs() {
CHECK(!raftLock_.try_lock());
if (status_ == Status::STARTING) {
LOG(ERROR) << idStr_ << "The partition is still starting";
return AppendLogResult::E_NOT_READY;
Expand Down Expand Up @@ -340,21 +341,10 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
LogType logType,
std::string log) {
LogCache swappedOutLogs;
LogID firstId;
auto retFuture = folly::Future<AppendLogResult>::makeEmpty();

{
std::lock_guard<std::mutex> lck(raftLock_);

auto res = canAppendLogs(lck);
if (res != AppendLogResult::SUCCEEDED) {
LOG(ERROR) << idStr_
<< "Cannot append logs, clean the buffer";
cachingPromise_.setValue(std::move(res));
cachingPromise_.reset();
logs_.clear();
return res;
}
std::lock_guard<std::mutex> lck(logsLock_);

VLOG(2) << idStr_ << "Checking whether buffer overflow";

Expand All @@ -370,7 +360,7 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,

// Append new logs to the buffer
DCHECK_GE(source, 0);
logs_.emplace_back(source, term_, logType, std::move(log));
logs_.emplace_back(source, logType, std::move(log));
switch (logType) {
case LogType::CAS:
retFuture = cachingPromise_.getSingleFuture();
Expand All @@ -382,29 +372,45 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
retFuture = cachingPromise_.getSharedFuture();
break;
}
if (replicatingLogs_) {
VLOG(2) << idStr_
<< "Another AppendLogs request is ongoing,"
" just return";
return retFuture;
} else {

bool expected = false;
if (replicatingLogs_.compare_exchange_strong(expected, true)) {
// We need to send logs to all followers
VLOG(2) << idStr_ << "Preparing to send AppendLog request";
replicatingLogs_ = true;
sendingPromise_ = std::move(cachingPromise_);
cachingPromise_.reset();
std::swap(swappedOutLogs, logs_);
firstId = lastLogId_ + 1;
} else {
VLOG(2) << idStr_
<< "Another AppendLogs request is ongoing,"
" just return";
return retFuture;
}
}

LogID firstId = 0;
TermID termId = 0;
{
std::lock_guard<std::mutex> g(raftLock_);
auto res = canAppendLogs();
if (res != AppendLogResult::SUCCEEDED) {
LOG(ERROR) << idStr_
<< "Cannot append logs, clean the buffer";
sendingPromise_.setValue(std::move(res));
replicatingLogs_ = false;
return res;
}
firstId = lastLogId_ + 1;
termId = term_;
}
// Replicate buffered logs to all followers
// Replication will happen on a separate thread and will block
// until majority accept the logs, the leadership changes, or
// the partition stops
VLOG(2) << idStr_ << "Calling appendLogsInternal()";
AppendLogsIterator it(
firstId,
termId,
std::move(swappedOutLogs),
[this] (const std::string& msg) -> std::string {
auto res = compareAndSet(msg);
Expand All @@ -414,13 +420,13 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
}
return res;
});
appendLogsInternal(std::move(it));
appendLogsInternal(std::move(it), termId);

return retFuture;
}


void RaftPart::appendLogsInternal(AppendLogsIterator iter) {
void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
TermID currTerm = 0;
LogID prevLogId = 0;
TermID prevLogTerm = 0;
Expand All @@ -440,8 +446,6 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter) {
// The partition is not running
VLOG(2) << idStr_ << "The partition is stopped";
sendingPromise_.setValue(AppendLogResult::E_STOPPED);
cachingPromise_.setValue(AppendLogResult::E_STOPPED);
logs_.clear();
replicatingLogs_ = false;
return;
}
Expand All @@ -450,8 +454,12 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter) {
// Is not a leader any more
VLOG(2) << idStr_ << "The leader has changed";
sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER);
cachingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER);
logs_.clear();
replicatingLogs_ = false;
return;
}
if (term_ != termId) {
VLOG(2) << idStr_ << "Term has been updated, origin " << termId << ", new " << term_;
sendingPromise_.setValue(AppendLogResult::E_TERM_OUT_OF_DATE);
replicatingLogs_ = false;
return;
}
Expand Down Expand Up @@ -501,8 +509,6 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
// The partition is not running
VLOG(2) << idStr_ << "The partition is stopped";
sendingPromise_.setValue(AppendLogResult::E_STOPPED);
cachingPromise_.setValue(AppendLogResult::E_STOPPED);
logs_.clear();
replicatingLogs_ = false;
return;
}
Expand All @@ -511,8 +517,6 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
// Is not a leader any more
VLOG(2) << idStr_ << "The leader has changed";
sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER);
cachingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER);
logs_.clear();
replicatingLogs_ = false;
return;
}
Expand Down Expand Up @@ -615,14 +619,13 @@ void RaftPart::processAppendLogResponses(
VLOG(2) << idStr_ << numSucceeded
<< " hosts have accepted the logs";

LogID firstLogId = 0;
{
std::lock_guard<std::mutex> g(raftLock_);
if (status_ != Status::RUNNING) {
// The partition is not running
VLOG(2) << idStr_ << "The partition is stopped";
sendingPromise_.setValue(AppendLogResult::E_STOPPED);
cachingPromise_.setValue(AppendLogResult::E_STOPPED);
logs_.clear();
replicatingLogs_ = false;
return;
}
Expand All @@ -631,8 +634,6 @@ void RaftPart::processAppendLogResponses(
// Is not a leader any more
VLOG(2) << idStr_ << "The leader has changed";
sendingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER);
cachingPromise_.setValue(AppendLogResult::E_NOT_A_LEADER);
logs_.clear();
replicatingLogs_ = false;
return;
}
Expand All @@ -651,51 +652,51 @@ void RaftPart::processAppendLogResponses(
// Step 3: Commit the batch
if (commitLogs(std::move(walIt))) {
committedLogId_ = lastLogId;

// Step 4: Fulfill the promise
if (iter.hasNonCASLogs()) {
sendingPromise_.setOneSharedValue(AppendLogResult::SUCCEEDED);
}
if (iter.leadByCAS()) {
sendingPromise_.setOneSingleValue(AppendLogResult::SUCCEEDED);
}
VLOG(2) << idStr_ << "Succeeded in committing the logs";

// Step 5: Check whether need to continue
// the log replication
CHECK(replicatingLogs_);
// Continue to process the original AppendLogsIterator if necessary
iter.resume();
if (iter.empty()) {
if (logs_.size() > 0) {
// continue to replicate the logs
sendingPromise_ = std::move(cachingPromise_);
cachingPromise_.reset();
iter = AppendLogsIterator(
lastLogId_ + 1,
std::move(logs_),
[this] (const std::string& log) -> std::string {
auto res = compareAndSet(log);
if (res.empty()) {
// Failed
sendingPromise_.setOneSingleValue(
AppendLogResult::E_CAS_FAILURE);
}
return res;
});
logs_.clear();
} else {
replicatingLogs_ = false;
VLOG(2) << idStr_ << "No more log to be replicated";
}
}
firstLogId = lastLogId_ + 1;
} else {
LOG(FATAL) << idStr_ << "Failed to commit logs";
}
VLOG(2) << idStr_ << "Succeeded in committing the logs";
}
// Step 4: Fulfill the promise
if (iter.hasNonCASLogs()) {
sendingPromise_.setOneSharedValue(AppendLogResult::SUCCEEDED);
}
if (iter.leadByCAS()) {
sendingPromise_.setOneSingleValue(AppendLogResult::SUCCEEDED);
}
// Step 5: Check whether need to continue
// the log replication
CHECK(replicatingLogs_);
// Continue to process the original AppendLogsIterator if necessary
iter.resume();
if (iter.empty()) {
std::lock_guard<std::mutex> lck(logsLock_);
if (logs_.size() > 0) {
// continue to replicate the logs
sendingPromise_ = std::move(cachingPromise_);
cachingPromise_.reset();
iter = AppendLogsIterator(
firstLogId,
currTerm,
std::move(logs_),
[this] (const std::string& log) -> std::string {
auto res = compareAndSet(log);
if (res.empty()) {
// Failed
sendingPromise_.setOneSingleValue(
AppendLogResult::E_CAS_FAILURE);
}
return res;
});
logs_.clear();
} else {
replicatingLogs_ = false;
VLOG(2) << idStr_ << "No more log to be replicated";
}
}

if (!iter.empty()) {
appendLogsInternal(std::move(iter));
appendLogsInternal(std::move(iter), currTerm);
}
} else {
// Not enough hosts accepted the log, re-try
Expand Down Expand Up @@ -898,6 +899,7 @@ bool RaftPart::leaderElection() {
}

LOG(FATAL) << "Should not reach here";
return false;
}


Expand Down
16 changes: 9 additions & 7 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,9 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
// resp -- AppendLogResponse
using AppendLogResponses = std::vector<cpp2::AppendLogResponse>;

// <source, term, logType, log>
// <source, logType, log>
using LogCache = std::vector<
std::tuple<ClusterID,
TermID,
LogType,
std::string>>;

Expand Down Expand Up @@ -278,13 +277,13 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {

// Check whether new logs can be appended
// Pre-condition: The caller needs to hold the raftLock_
AppendLogResult canAppendLogs(std::lock_guard<std::mutex>& lck);
AppendLogResult canAppendLogs();

folly::Future<AppendLogResult> appendLogAsync(ClusterID source,
LogType logType,
std::string log);

void appendLogsInternal(AppendLogsIterator iter);
void appendLogsInternal(AppendLogsIterator iter, TermID termId);

void replicateLogs(
folly::EventBase* eb,
Expand Down Expand Up @@ -394,13 +393,16 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
peerHosts_;
size_t quorum_{0};

// The lock is used to protect logs_ and cachingPromise_
mutable std::mutex logsLock_;
std::atomic_bool replicatingLogs_{false};
PromiseSet<AppendLogResult> cachingPromise_;
LogCache logs_;

// Partition level lock to synchronize the access of the partition
mutable std::mutex raftLock_;

bool replicatingLogs_{false};
PromiseSet<AppendLogResult> cachingPromise_;
PromiseSet<AppendLogResult> sendingPromise_;
LogCache logs_;

Status status_;
Role role_;
Expand Down

0 comments on commit 6b73ba1

Please sign in to comment.