From e1527fec863c8874efb5d468253d77d58363a3b3 Mon Sep 17 00:00:00 2001 From: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com> Date: Fri, 21 Jan 2022 11:25:29 +0800 Subject: [PATCH] fix always loop when no log need to send --- src/kvstore/raftex/Host.cpp | 26 +++++++++++++++++++++++++- src/kvstore/raftex/RaftPart.cpp | 6 ++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 33d2d3f7708..9ca2debcd7c 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -276,11 +276,24 @@ Host::prepareAppendLogRequest() { } if (lastLogIdSent_ + 1 > part_->wal()->lastLogId()) { + // send an emtpy log, if the remote host return error and committedLogId, we try again with the + // new lastLogIdSent=committedLogId, then if it still has lastLogIdSent_ + 1 > + // part_->wal()->lastLogId(),the committedLogId must be equal to lastLogIdSent, so try again and + // the remote host would return success this time. LOG_IF(INFO, FLAGS_trace_raft) << idStr_ << "My lastLogId in wal is " << part_->wal()->lastLogId() << ", but you are seeking " << lastLogIdSent_ + 1 << ", so i have nothing to send, logIdToSend_ = " << logIdToSend_; - return nebula::cpp2::ErrorCode::E_RAFT_NO_WAL_FOUND; + auto req = std::make_shared(); + req->space_ref() = part_->spaceId(); + req->part_ref() = part_->partitionId(); + req->current_term_ref() = logTermToSend_; + req->committed_log_id_ref() = committedLogId_; + req->leader_addr_ref() = part_->address().host; + req->leader_port_ref() = part_->address().port; + req->last_log_term_sent_ref() = lastLogTermSent_; + req->last_log_id_sent_ref() = lastLogIdSent_; + return req; } auto it = part_->wal()->iterator(lastLogIdSent_ + 1, logIdToSend_); @@ -303,6 +316,17 @@ Host::prepareAppendLogRequest() { entry.log_term_ref() = it->logTerm(); logs.emplace_back(std::move(entry)); } + while (it->valid()) { + if (it->logTerm() < part_->termId()) { + cpp2::RaftLogEntry entry; + entry.cluster_ref() = it->logSource(); + entry.log_str_ref() = it->logMsg().toString(); + entry.log_term_ref() = it->logTerm(); + logs.emplace_back(std::move(entry)); + } else { + break; + } + } // the last log entry's id is (lastLogIdSent_ + cnt), when iterator is invalid and last log // entry's id is not logIdToSend_, which means the log has been rollbacked if (!it->valid() && (lastLogIdSent_ + static_cast(logs.size()) != logIdToSend_)) { diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 4d49bf60972..f8c76b0087a 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1001,6 +1001,12 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, LOG_EVERY_N(WARNING, 100) << idStr_ << "Only " << numSucceeded << " hosts succeeded, Need to try again"; usleep(1000); + { + std::lock_guard g(raftLock_); + // wal could be rollback between two cycles, if we still use the old lastLogId, we may always + // get E_RAFT_NO_WAL_FOUND + lastLogId = std::min(wal_->lastLogId(), lastLogId); + } replicateLogs(eb, std::move(iter), currTerm, lastLogId, committedId, prevLogTerm, prevLogId); } }