Skip to content

Commit

Permalink
fix apply outdate membership change (#4107)
Browse files Browse the repository at this point in the history
* fix apply outdate membership change

* fix transfer leader to '':0 cause crash

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
critical27 and Sophie-Xie authored Apr 6, 2022
1 parent daaed0a commit 6b20961
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 18 deletions.
4 changes: 1 addition & 3 deletions src/kvstore/LogEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,6 @@ decodeBatchValue(folly::StringPiece encoded) {

std::string encodeHost(LogType type, const HostAddr& host) {
std::string encoded;
// 15 refers to "255.255.255.255"
encoded.reserve(sizeof(int64_t) + 1 + 15 + sizeof(int));
int64_t ts = time::WallClock::fastNowInMilliSec();
std::string encodedHost;
apache::thrift::CompactSerializer::serialize(host, &encodedHost);
Expand All @@ -232,7 +230,7 @@ std::string encodeHost(LogType type, const HostAddr& host) {
HostAddr decodeHost(LogType type, const folly::StringPiece& encoded) {
HostAddr addr;

CHECK_GE(encoded.size(), sizeof(int64_t) + 1 + sizeof(size_t) + sizeof(Port));
CHECK_GE(encoded.size(), sizeof(int64_t) + 1);
CHECK(encoded[sizeof(int64_t)] == type);

folly::StringPiece raw = encoded;
Expand Down
73 changes: 58 additions & 15 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ void Part::onDiscoverNewLeader(HostAddr nLeader) {

std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
std::unique_ptr<LogIterator> iter, bool wait, bool needLock) {
// We should apply any membership change which happens before start time. Because when we start
// up, the peers comes from meta, has already contains all previous changes.
SCOPED_TIMER(&execTime_);
auto batch = engine_->startBatchWrite();
LogID lastId = kNoCommitLogId;
Expand Down Expand Up @@ -309,12 +311,26 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
commitTransLeader(newLeader, needLock);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
commitTransLeader(newLeader, needLock);
} else {
VLOG(2) << idStr_ << "Skip commit stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
commitRemovePeer(peer, needLock);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
commitRemovePeer(peer, needLock);
} else {
VLOG(2) << idStr_ << "Skip commit stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
break;
}
default: {
Expand Down Expand Up @@ -389,36 +405,63 @@ nebula::cpp2::ErrorCode Part::putCommitMsg(WriteBatch* batch,
}

bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const std::string& log) {
// We should apply any membership change which happens before start time. Because when we start
// up, the peers comes from meta, has already contains all previous changes.
VLOG(4) << idStr_ << "logId " << logId << ", termId " << termId << ", clusterId " << clusterId;
if (!log.empty()) {
switch (log[sizeof(int64_t)]) {
case OP_ADD_LEARNER: {
auto learner = decodeHost(OP_ADD_LEARNER, log);
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner, false);
// persist the part learner info in case of storaged restarting
engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner));
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add learner " << learner;
addLearner(learner, false);
// persist the part learner info in case of storaged restarting
engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner));
} else {
VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
break;
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
} else {
VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
break;
}
case OP_ADD_PEER: {
auto peer = decodeHost(OP_ADD_PEER, log);
LOG(INFO) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer));
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer));
} else {
VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
LOG(INFO) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
// remove peer in the persist info
engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted));
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
// remove peer in the persist info
engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted));
} else {
VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
break;
}
default: {
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
}
auto target = dst;
if (dst == kRandomPeer) {
// pick a alive host as target when dst not specified
for (auto& p : peers) {
if (p != src) {
auto retCode = ActiveHostsMan::isLived(kv_, p);
Expand All @@ -53,6 +54,9 @@ folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
}
}
}
if (target == kRandomPeer) {
return Status::Error("No active peers found");
}
req.new_leader_ref() = std::move(target);
return getResponseFromPart(
Utils::getAdminAddrFromStoreAddr(src),
Expand Down

0 comments on commit 6b20961

Please sign in to comment.