Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft problem #2483

Merged
merged 3 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 78 additions & 42 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1548,56 +1548,92 @@ void RaftPart::processAppendLogRequest(
}

// req.get_last_log_id_sent() >= committedLogId_
if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) {
LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_
<< ", which is different from the leader's prevLogTerm "
<< req.get_last_log_term_sent()
<< ", the prevLogId is " << req.get_last_log_id_sent()
<< ". So need to rollback to last committedLogId_ " << committedLogId_;
if (wal_->rollbackToLog(committedLogId_)) {
if (req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_) {
// nothing to do
// just append log later
} else if (req.get_last_log_id_sent() > lastLogId_) {
// There is a gap
LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_
<< ". Need to catch up";
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else {
// check the term
int reqLastLogTerm = wal_->getLogTerm(req.get_last_log_id_sent());
if (req.get_last_log_term_sent() != reqLastLogTerm) {
LOG(INFO) << idStr_ << "The local log term is " << reqLastLogTerm
<< ", which is different from the leader's prevLogTerm "
<< req.get_last_log_term_sent()
<< ", the prevLogId is " << req.get_last_log_id_sent()
<< ". So ask leader to send logs from committedLogId " << committedLogId_;
TermID committedLogTerm = wal_->getLogTerm(committedLogId_);
if (committedLogTerm > 0 ) {
resp.set_last_log_id(committedLogId_);
resp.set_last_log_term(committedLogTerm);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider do not set_last_log_term, IIRC, it is not used, so we don't nedd to wal_->getLogTerm(committedLogId_);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have considered it before, but if we don't set it,then the resp is: (commmitedLogId_,last_log_term).

when leader receive the response, it will send a new request with (commmitedLogId_,last_log_term) not (commmitedLogId_, commmitedLogIdTerm_), see self->lastLogTermSent_ = resp.get_last_log_term();

append log request with (commmitedLogId_,last_log_term) could not pass the check in the follower , which will causing a dead loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right.

}
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
}
}

// request get_last_log_term_sent == wal[get_last_log_id_sent].log_term
size_t numLogs = req.get_log_str_list().size();
LogID firstId = req.get_last_log_id_sent() + 1;

std::vector<cpp2::LogEntry> logEntries (req.get_log_str_list().begin(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try use std::make_move_iterator here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I have refined it

req.get_log_str_list().end());

// may be need to rollback wal_
if (!( req.get_last_log_id_sent() == wal_->lastLogId() && req.get_last_log_term_sent() == wal_->lastLogTerm())) {
Copy link
Contributor

@critical27 critical27 Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we didn't check the condition like in line 1551 req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_?

My point is, when it is true, we don't need to check term in each log according to the property of Log Matching in paper.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, using

req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_

to check is better, I will modify it later.

I agree that when it is true, it not need to check term in each log. that is why I add this condition check.

if (!( req.get_last_log_id_sent() == wal_->lastLogId() && req.get_last_log_term_sent() == wal_->lastLogTerm()))

In most case, above check is false, therefore we don not need check term of log one by one, just append the new log。

Copy link
Contributor

@critical27 critical27 Jul 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most case, above check is false, therefore we don not need check term of log one by one, just append the new log。

Agree.

The reason I think lastLogId_ and lastLogTerm_ will be better is that: If a host converts from follower -> leader -> follower again, in the leader phase, it could write some log in wal (although the case is rare).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i get it

// check the diff index in log
size_t diffIndex = 0;
{
std::unique_ptr<LogIterator> it = wal_->iterator(firstId,firstId + numLogs);
for (size_t i = 0; i < numLogs && it->valid(); i++, ++(*it), diffIndex ++ ) {
int logTerm = it->logTerm();
if (req.get_log_term() != logTerm) {
break;
}
}
it.reset();
}

// stale log
if (diffIndex == numLogs) {
resp.set_last_log_id(firstId + numLogs - 1);
resp.set_last_log_term(req.get_log_term());
resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE);
//nothing to append
return;
}

//rollback the wal_
if (wal_->rollbackToLog(firstId + diffIndex -1)) {
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
resp.set_last_log_id(lastLogId_);
resp.set_last_log_term(lastLogTerm_);
LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_
<< ", logLogTerm is " << lastLogTerm_
<< ", committedLogId is " << committedLogId_
<< ", term is " << term_;
}
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else if (req.get_last_log_id_sent() > lastLogId_) {
// There is a gap
LOG(INFO) << idStr_ << "Local is missing logs from id "
<< lastLogId_ << ". Need to catch up";
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else if (req.get_last_log_id_sent() < lastLogId_) {
// TODO(doodle): This is a potential bug which would cause data not in consensus. In most
// case, we would hit this path when leader append logs to follower and timeout (leader
// would set lastLogIdSent_ = logIdToSend_ - 1 in Host). **But follower actually received
// it successfully**. Which will explain when leader retry to append these logs, the LOG
// belows is printed, and lastLogId_ == req.get_last_log_id_sent() + 1 in the LOG.
//
// In fact we should always rollback to req.get_last_log_id_sent(), and append the logs from
// leader (we can't make promise that the logs in range [req.get_last_log_id_sent() + 1,
// lastLogId_] is same with follower). However, this makes no difference in the above case.
LOG(INFO) << idStr_ << "Stale log! Local lastLogId " << lastLogId_
<< ", lastLogTerm " << lastLogTerm_
<< ", lastLogIdSent " << req.get_last_log_id_sent()
<< ", lastLogTermSent " << req.get_last_log_term_sent();
resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE);
return;
} else {
LOG(ERROR) << idStr_ << "Rollback fail! lastLogId is" << lastLogId_
<< ", logLogTerm is " << lastLogTerm_
<< ", committedLogId is " << committedLogId_
<< ", rollback id is " << firstId + diffIndex -1;
resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL);
return;
}

// update msg
logEntries = std::vector<cpp2::LogEntry> (req.get_log_str_list().begin() + diffIndex,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be quite expensive as well, emm, can't figure out a better way for now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refined it to avoid copy

req.get_log_str_list().end());
firstId = firstId + diffIndex;
numLogs = numLogs - diffIndex;
}

// Append new logs
size_t numLogs = req.get_log_str_list().size();
LogID firstId = req.get_last_log_id_sent() + 1;
VLOG(2) << idStr_ << "Writing log [" << firstId
<< ", " << firstId + numLogs - 1 << "] to WAL";
LogStrListIterator iter(firstId,
req.get_log_term(),
req.get_log_str_list());
LogStrListIterator iter(firstId, req.get_log_term(), logEntries);

if (wal_->appendLogs(iter)) {
if (numLogs != 0) {
CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId;
Expand All @@ -1608,7 +1644,7 @@ void RaftPart::processAppendLogRequest(
resp.set_last_log_term(lastLogTerm_);
} else {
LOG(ERROR) << idStr_ << "Failed to append logs to WAL";
resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL);
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
}

Expand Down
9 changes: 9 additions & 0 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,5 +792,14 @@ size_t FileBasedWal::accessAllBuffers(std::function<bool(BufferPtr buffer)> fn)
return count;
}

TermID FileBasedWal::getLogTerm(LogID id) {
TermID term = -1;
auto walIter = std::make_unique<FileBasedWalIterator>(shared_from_this(), id, id);
critical27 marked this conversation as resolved.
Show resolved Hide resolved
if (walIter->valid()) {
term = walIter->logTerm();
}
walIter.reset();
return term;
}
} // namespace wal
} // namespace nebula
3 changes: 3 additions & 0 deletions src/kvstore/wal/FileBasedWal.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class FileBasedWal final : public Wal
TermID lastLogTerm() const override {
return lastLogTerm_;
}
// Return the term of special logId
// if not exist, return -1
TermID getLogTerm(LogID id) override;

// Append one log messages to the WAL
// This method **IS NOT** thread-safe
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/wal/Wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class Wal {
// Return the term to receive the last log
virtual TermID lastLogTerm() const = 0;

// Return the term of special logId
// if not exist, return -1
virtual TermID getLogTerm(LogID id) = 0;

// Append one log message to the WAL
virtual bool appendLog(LogID id,
TermID term,
Expand Down