Skip to content

Commit

Permalink
fix inconsistency problem in appendlog
Browse files Browse the repository at this point in the history
  • Loading branch information
sherlockkenan committed Jun 30, 2021
1 parent 2faf9bd commit 2bc5702
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 42 deletions.
120 changes: 78 additions & 42 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1548,56 +1548,92 @@ void RaftPart::processAppendLogRequest(
}

// req.get_last_log_id_sent() >= committedLogId_
if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) {
LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_
<< ", which is different from the leader's prevLogTerm "
<< req.get_last_log_term_sent()
<< ", the prevLogId is " << req.get_last_log_id_sent()
<< ". So need to rollback to last committedLogId_ " << committedLogId_;
if (wal_->rollbackToLog(committedLogId_)) {
if (req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_) {
// nothing to do
// just append log later
} else if (req.get_last_log_id_sent() > lastLogId_) {
// There is a gap
LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_
<< ". Need to catch up";
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else {
// check the term
int reqLastLogTerm = wal_->getLogTerm(req.get_last_log_id_sent());
if (req.get_last_log_term_sent() != reqLastLogTerm) {
LOG(INFO) << idStr_ << "The local log term is " << reqLastLogTerm
<< ", which is different from the leader's prevLogTerm "
<< req.get_last_log_term_sent()
<< ", the prevLogId is " << req.get_last_log_id_sent()
<< ". So ask leader to send logs from committedLogId " << committedLogId_;
TermID committedLogTerm = wal_->getLogTerm(committedLogId_);
if (committedLogTerm > 0 ) {
resp.set_last_log_id(committedLogId_);
resp.set_last_log_term(committedLogTerm);
}
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
}
}

// request get_last_log_term_sent == wal[get_last_log_id_sent].log_term
size_t numLogs = req.get_log_str_list().size();
LogID firstId = req.get_last_log_id_sent() + 1;

std::vector<cpp2::LogEntry> logEntries (req.get_log_str_list().begin(),
req.get_log_str_list().end());

// may be need to rollback wal_
if (!( req.get_last_log_id_sent() == wal_->lastLogId() && req.get_last_log_term_sent() == wal_->lastLogTerm())) {
// check the diff index in log
size_t diffIndex = 0;
{
std::unique_ptr<LogIterator> it = wal_->iterator(firstId,firstId + numLogs);
for (size_t i = 0; i < numLogs && it->valid(); i++, ++(*it), diffIndex ++ ) {
int logTerm = it->logTerm();
if (req.get_log_term() != logTerm) {
break;
}
}
it.reset();
}

// stale log
if (diffIndex == numLogs) {
resp.set_last_log_id(firstId + numLogs - 1);
resp.set_last_log_term(req.get_log_term());
resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE);
//nothing to append
return;
}

//rollback the wal_
if (wal_->rollbackToLog(firstId + diffIndex -1)) {
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
resp.set_last_log_id(lastLogId_);
resp.set_last_log_term(lastLogTerm_);
LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_
<< ", logLogTerm is " << lastLogTerm_
<< ", committedLogId is " << committedLogId_
<< ", term is " << term_;
}
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else if (req.get_last_log_id_sent() > lastLogId_) {
// There is a gap
LOG(INFO) << idStr_ << "Local is missing logs from id "
<< lastLogId_ << ". Need to catch up";
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else if (req.get_last_log_id_sent() < lastLogId_) {
// TODO(doodle): This is a potential bug which would cause data not in consensus. In most
// case, we would hit this path when leader append logs to follower and timeout (leader
// would set lastLogIdSent_ = logIdToSend_ - 1 in Host). **But follower actually received
// it successfully**. Which will explain when leader retry to append these logs, the LOG
// belows is printed, and lastLogId_ == req.get_last_log_id_sent() + 1 in the LOG.
//
// In fact we should always rollback to req.get_last_log_id_sent(), and append the logs from
// leader (we can't make promise that the logs in range [req.get_last_log_id_sent() + 1,
// lastLogId_] is same with follower). However, this makes no difference in the above case.
LOG(INFO) << idStr_ << "Stale log! Local lastLogId " << lastLogId_
<< ", lastLogTerm " << lastLogTerm_
<< ", lastLogIdSent " << req.get_last_log_id_sent()
<< ", lastLogTermSent " << req.get_last_log_term_sent();
resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE);
return;
} else {
LOG(ERROR) << idStr_ << "Rollback fail! lastLogId is" << lastLogId_
<< ", logLogTerm is " << lastLogTerm_
<< ", committedLogId is " << committedLogId_
<< ", rollback id is " << firstId + diffIndex -1;
resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL);
return;
}

// update msg
logEntries = std::vector<cpp2::LogEntry> (req.get_log_str_list().begin() + diffIndex,
req.get_log_str_list().end());
firstId = firstId + diffIndex;
numLogs = numLogs - diffIndex;
}

// Append new logs
size_t numLogs = req.get_log_str_list().size();
LogID firstId = req.get_last_log_id_sent() + 1;
VLOG(2) << idStr_ << "Writing log [" << firstId
<< ", " << firstId + numLogs - 1 << "] to WAL";
LogStrListIterator iter(firstId,
req.get_log_term(),
req.get_log_str_list());
LogStrListIterator iter(firstId, req.get_log_term(), logEntries);

if (wal_->appendLogs(iter)) {
if (numLogs != 0) {
CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId;
Expand All @@ -1608,7 +1644,7 @@ void RaftPart::processAppendLogRequest(
resp.set_last_log_term(lastLogTerm_);
} else {
LOG(ERROR) << idStr_ << "Failed to append logs to WAL";
resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL);
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
}

Expand Down
9 changes: 9 additions & 0 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,5 +792,14 @@ size_t FileBasedWal::accessAllBuffers(std::function<bool(BufferPtr buffer)> fn)
return count;
}

TermID FileBasedWal::getLogTerm(LogID id) {
TermID term = -1;
auto walIter = std::make_unique<FileBasedWalIterator>(shared_from_this(), id, id);
if (walIter->valid()) {
term = walIter->logTerm();
}
walIter.reset();
return term;
}
} // namespace wal
} // namespace nebula
3 changes: 3 additions & 0 deletions src/kvstore/wal/FileBasedWal.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class FileBasedWal final : public Wal
TermID lastLogTerm() const override {
return lastLogTerm_;
}
// Return the term of special logId
// if not exist, return -1
TermID getLogTerm(LogID id) override;

// Append one log messages to the WAL
// This method **IS NOT** thread-safe
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/wal/Wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class Wal {
// Return the term to receive the last log
virtual TermID lastLogTerm() const = 0;

// Return the term of special logId
// if not exist, return -1
virtual TermID getLogTerm(LogID id) = 0;

// Append one log message to the WAL
virtual bool appendLog(LogID id,
TermID term,
Expand Down

0 comments on commit 2bc5702

Please sign in to comment.