Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft problem #2483

Merged
merged 3 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 78 additions & 42 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1548,56 +1548,92 @@ void RaftPart::processAppendLogRequest(
}

// req.get_last_log_id_sent() >= committedLogId_
if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) {
LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_
<< ", which is different from the leader's prevLogTerm "
<< req.get_last_log_term_sent()
<< ", the prevLogId is " << req.get_last_log_id_sent()
<< ". So need to rollback to last committedLogId_ " << committedLogId_;
if (wal_->rollbackToLog(committedLogId_)) {
if (req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_) {
// nothing to do
// just append log later
} else if (req.get_last_log_id_sent() > lastLogId_) {
// There is a gap
LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_
<< ". Need to catch up";
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else {
// check the term
int reqLastLogTerm = wal_->getLogTerm(req.get_last_log_id_sent());
if (req.get_last_log_term_sent() != reqLastLogTerm) {
LOG(INFO) << idStr_ << "The local log term is " << reqLastLogTerm
<< ", which is different from the leader's prevLogTerm "
<< req.get_last_log_term_sent()
<< ", the prevLogId is " << req.get_last_log_id_sent()
<< ". So ask leader to send logs from committedLogId " << committedLogId_;
TermID committedLogTerm = wal_->getLogTerm(committedLogId_);
if (committedLogTerm > 0 ) {
resp.set_last_log_id(committedLogId_);
resp.set_last_log_term(committedLogTerm);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider do not set_last_log_term, IIRC, it is not used, so we don't nedd to wal_->getLogTerm(committedLogId_);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have considered it before, but if we don't set it,then the resp is: (commmitedLogId_,last_log_term).

when leader receive the response, it will send a new request with (commmitedLogId_,last_log_term) not (commmitedLogId_, commmitedLogIdTerm_), see self->lastLogTermSent_ = resp.get_last_log_term();

append log request with (commmitedLogId_,last_log_term) could not pass the check in the follower , which will causing a dead loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right.

}
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
}
}

// request get_last_log_term_sent == wal[get_last_log_id_sent].log_term
size_t numLogs = req.get_log_str_list().size();
LogID firstId = req.get_last_log_id_sent() + 1;

std::vector<cpp2::LogEntry> logEntries (req.get_log_str_list().begin(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try use std::make_move_iterator here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I have refined it

req.get_log_str_list().end());

// may be need to rollback wal_
if (!(req.get_last_log_id_sent() == lastLogId_ && req.get_last_log_term_sent() == lastLogTerm_)) {
// check the diff index in log
size_t diffIndex = 0;
{
std::unique_ptr<LogIterator> it = wal_->iterator(firstId,firstId + numLogs);
for (size_t i = 0; i < numLogs && it->valid(); i++, ++(*it), diffIndex ++ ) {
int logTerm = it->logTerm();
if (req.get_log_term() != logTerm) {
break;
}
}
it.reset();
}

// stale log
if (diffIndex == numLogs) {
resp.set_last_log_id(firstId + numLogs - 1);
resp.set_last_log_term(req.get_log_term());
resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE);
//nothing to append
return;
}

//rollback the wal_
if (wal_->rollbackToLog(firstId + diffIndex -1)) {
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
resp.set_last_log_id(lastLogId_);
resp.set_last_log_term(lastLogTerm_);
LOG(INFO) << idStr_ << "Rollback succeeded! lastLogId is " << lastLogId_
<< ", logLogTerm is " << lastLogTerm_
<< ", committedLogId is " << committedLogId_
<< ", term is " << term_;
}
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else if (req.get_last_log_id_sent() > lastLogId_) {
// There is a gap
LOG(INFO) << idStr_ << "Local is missing logs from id "
<< lastLogId_ << ". Need to catch up";
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
} else if (req.get_last_log_id_sent() < lastLogId_) {
// TODO(doodle): This is a potential bug which would cause data not in consensus. In most
// case, we would hit this path when leader append logs to follower and timeout (leader
// would set lastLogIdSent_ = logIdToSend_ - 1 in Host). **But follower actually received
// it successfully**. Which will explain when leader retry to append these logs, the LOG
// belows is printed, and lastLogId_ == req.get_last_log_id_sent() + 1 in the LOG.
//
// In fact we should always rollback to req.get_last_log_id_sent(), and append the logs from
// leader (we can't make promise that the logs in range [req.get_last_log_id_sent() + 1,
// lastLogId_] is same with follower). However, this makes no difference in the above case.
LOG(INFO) << idStr_ << "Stale log! Local lastLogId " << lastLogId_
<< ", lastLogTerm " << lastLogTerm_
<< ", lastLogIdSent " << req.get_last_log_id_sent()
<< ", lastLogTermSent " << req.get_last_log_term_sent();
resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE);
return;
} else {
LOG(ERROR) << idStr_ << "Rollback fail! lastLogId is" << lastLogId_
<< ", logLogTerm is " << lastLogTerm_
<< ", committedLogId is " << committedLogId_
<< ", rollback id is " << firstId + diffIndex -1;
resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL);
return;
}

// update msg
logEntries = std::vector<cpp2::LogEntry> (req.get_log_str_list().begin() + diffIndex,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be quite expensive as well, emm, can't figure out a better way for now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refined it to avoid copy

req.get_log_str_list().end());
firstId = firstId + diffIndex;
numLogs = numLogs - diffIndex;
}

// Append new logs
size_t numLogs = req.get_log_str_list().size();
LogID firstId = req.get_last_log_id_sent() + 1;
VLOG(2) << idStr_ << "Writing log [" << firstId
<< ", " << firstId + numLogs - 1 << "] to WAL";
LogStrListIterator iter(firstId,
req.get_log_term(),
req.get_log_str_list());
LogStrListIterator iter(firstId, req.get_log_term(), logEntries);

if (wal_->appendLogs(iter)) {
if (numLogs != 0) {
CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()) << "First Id is " << firstId;
Expand All @@ -1608,7 +1644,7 @@ void RaftPart::processAppendLogRequest(
resp.set_last_log_term(lastLogTerm_);
} else {
LOG(ERROR) << idStr_ << "Failed to append logs to WAL";
resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL);
resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP);
return;
}

Expand Down
17 changes: 17 additions & 0 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,5 +792,22 @@ size_t FileBasedWal::accessAllBuffers(std::function<bool(BufferPtr buffer)> fn)
return count;
}

TermID FileBasedWal::getLogTerm(LogID id) {
// check the memory log buffer
for (auto it = buffers_.rbegin(); it != buffers_.rend(); ++it) {
auto buffer = *it;
if (id >= buffer->firstLogId() && id <= buffer->lastLogId()) {
return buffer->getTerm(id - buffer->firstLogId());
}
}
// check the log file
TermID term = -1;
auto walIter = std::make_unique<FileBasedWalIterator>(shared_from_this(), id, id);
critical27 marked this conversation as resolved.
Show resolved Hide resolved
if (walIter->valid()) {
term = walIter->logTerm();
}
walIter.reset();
return term;
}
} // namespace wal
} // namespace nebula
3 changes: 3 additions & 0 deletions src/kvstore/wal/FileBasedWal.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class FileBasedWal final : public Wal
TermID lastLogTerm() const override {
return lastLogTerm_;
}
// Return the term of special logId
// if not exist, return -1
TermID getLogTerm(LogID id) override;

// Append one log messages to the WAL
// This method **IS NOT** thread-safe
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/wal/Wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class Wal {
// Return the term to receive the last log
virtual TermID lastLogTerm() const = 0;

// Return the term of special logId
// if not exist, return -1
virtual TermID getLogTerm(LogID id) = 0;

// Append one log message to the WAL
virtual bool appendLog(LogID id,
TermID term,
Expand Down
38 changes: 38 additions & 0 deletions src/kvstore/wal/test/FileBasedWalTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,44 @@ TEST(FileBasedWal, LinkTest) {
EXPECT_EQ(num + 1, wal->walFiles_.size());
}

TEST(FileBasedWal, getLogTermTest) {
FileBasedWalPolicy policy;
policy.fileSize = 1024L * 1024L;
policy.bufferSize = 1024L * 1024L;
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");
auto wal = FileBasedWal::getWal(walDir.path(),
"",
policy,
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});

// Append > 10MB logs in total
for (int i = 1; i <= 10000; i++) {
ASSERT_TRUE(wal->appendLog(i /*id*/, i /*term*/, 0 /*cluster*/,
folly::stringPrintf(kLongMsg, i)));
}

// in the memory buffer
ASSERT_EQ(10000, wal->getLogTerm(10000));
// in the file
ASSERT_EQ(4, wal->getLogTerm(4));

// Close the wal
wal.reset();

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(),
"",
policy,
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(10, wal->getLogTerm(10));
}

} // namespace wal
} // namespace nebula

Expand Down