diff --git a/src/kvstore/LogEncoder.cpp b/src/kvstore/LogEncoder.cpp index db5adfa08df..170bcaad02d 100644 --- a/src/kvstore/LogEncoder.cpp +++ b/src/kvstore/LogEncoder.cpp @@ -217,8 +217,6 @@ decodeBatchValue(folly::StringPiece encoded) { std::string encodeHost(LogType type, const HostAddr& host) { std::string encoded; - // 15 refers to "255.255.255.255" - encoded.reserve(sizeof(int64_t) + 1 + 15 + sizeof(int)); int64_t ts = time::WallClock::fastNowInMilliSec(); std::string encodedHost; apache::thrift::CompactSerializer::serialize(host, &encodedHost); @@ -232,7 +230,7 @@ std::string encodeHost(LogType type, const HostAddr& host) { HostAddr decodeHost(LogType type, const folly::StringPiece& encoded) { HostAddr addr; - CHECK_GE(encoded.size(), sizeof(int64_t) + 1 + sizeof(size_t) + sizeof(Port)); + CHECK_GE(encoded.size(), sizeof(int64_t) + 1); CHECK(encoded[sizeof(int64_t)] == type); folly::StringPiece raw = encoded; diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 6c3a10cab85..92fe487b6c8 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -212,6 +212,8 @@ void Part::onDiscoverNewLeader(HostAddr nLeader) { std::tuple Part::commitLogs( std::unique_ptr iter, bool wait, bool needLock) { + // We should apply any membership change which happens before start time. Because when we start + // up, the peers comes from meta, has already contains all previous changes. SCOPED_TIMER(&execTime_); auto batch = engine_->startBatchWrite(); LogID lastId = kNoCommitLogId; @@ -309,12 +311,26 @@ std::tuple Part::commitLogs( } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - commitTransLeader(newLeader, needLock); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + commitTransLeader(newLeader, needLock); + } else { + VLOG(2) << idStr_ << "Skip commit stale transfer leader " << newLeader + << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " + << ts; + } break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - commitRemovePeer(peer, needLock); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + commitRemovePeer(peer, needLock); + } else { + VLOG(2) << idStr_ << "Skip commit stale remove peer " << peer + << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " + << ts; + } break; } default: { @@ -389,36 +405,63 @@ nebula::cpp2::ErrorCode Part::putCommitMsg(WriteBatch* batch, } bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const std::string& log) { + // We should apply any membership change which happens before start time. Because when we start + // up, the peers comes from meta, has already contains all previous changes. VLOG(4) << idStr_ << "logId " << logId << ", termId " << termId << ", clusterId " << clusterId; if (!log.empty()) { switch (log[sizeof(int64_t)]) { case OP_ADD_LEARNER: { auto learner = decodeHost(OP_ADD_LEARNER, log); - LOG(INFO) << idStr_ << "preprocess add learner " << learner; - addLearner(learner, false); - // persist the part learner info in case of storaged restarting - engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner)); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + VLOG(1) << idStr_ << "preprocess add learner " << learner; + addLearner(learner, false); + // persist the part learner info in case of storaged restarting + engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner)); + } else { + VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at " + << startTimeMs_ << ", but the log timestamp is " << ts; + } break; } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader; - preProcessTransLeader(newLeader); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + VLOG(1) << idStr_ << "preprocess trans leader " << newLeader; + preProcessTransLeader(newLeader); + } else { + VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader + << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " + << ts; + } break; } case OP_ADD_PEER: { auto peer = decodeHost(OP_ADD_PEER, log); - LOG(INFO) << idStr_ << "preprocess add peer " << peer; - addPeer(peer); - engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer)); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + VLOG(1) << idStr_ << "preprocess add peer " << peer; + addPeer(peer); + engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer)); + } else { + VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at " + << startTimeMs_ << ", but the log timestamp is " << ts; + } break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - LOG(INFO) << idStr_ << "preprocess remove peer " << peer; - preProcessRemovePeer(peer); - // remove peer in the persist info - engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted)); + auto ts = getTimestamp(log); + if (ts > startTimeMs_) { + VLOG(1) << idStr_ << "preprocess remove peer " << peer; + preProcessRemovePeer(peer); + // remove peer in the persist info + engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted)); + } else { + VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at " + << startTimeMs_ << ", but the log timestamp is " << ts; + } break; } default: { diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index dbcfdd90a2d..d9e493df81f 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -43,6 +43,7 @@ folly::Future AdminClient::transLeader(GraphSpaceID spaceId, } auto target = dst; if (dst == kRandomPeer) { + // pick a alive host as target when dst not specified for (auto& p : peers) { if (p != src) { auto retCode = ActiveHostsMan::isLived(kv_, p); @@ -53,6 +54,9 @@ folly::Future AdminClient::transLeader(GraphSpaceID spaceId, } } } + if (target == kRandomPeer) { + return Status::Error("No active peers found"); + } req.new_leader_ref() = std::move(target); return getResponseFromPart( Utils::getAdminAddrFromStoreAddr(src),