From d8f0e7c3d29869ea48b75f35a39cff50bf62cd51 Mon Sep 17 00:00:00 2001 From: heng Date: Wed, 15 Jul 2020 17:01:57 +0800 Subject: [PATCH 1/2] Check peers when added part already existed --- src/interface/storage.thrift | 1 + src/kvstore/NebulaStore.cpp | 54 +++++++++++++------- src/kvstore/NebulaStore.h | 8 ++- src/kvstore/Part.cpp | 24 ++++++--- src/kvstore/PartManager.cpp | 2 +- src/kvstore/PartManager.h | 13 ++++- src/kvstore/raftex/RaftPart.cpp | 6 ++- src/kvstore/raftex/test/MemberChangeTest.cpp | 6 ++- src/meta/processors/admin/AdminClient.cpp | 27 ++++++++-- src/meta/processors/admin/BalancePlan.cpp | 4 ++ src/meta/processors/admin/BalanceTask.cpp | 14 ++++- src/storage/admin/AdminProcessor.h | 7 ++- 12 files changed, 127 insertions(+), 39 deletions(-) diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index f25c20a8d5d..f93f65e661a 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -236,6 +236,7 @@ struct AddPartReq { 1: common.GraphSpaceID space_id, 2: common.PartitionID part_id, 3: bool as_learner, + 4: list peers, } struct RemovePartReq { diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 5a9386e16e0..404b83d47d4 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -248,12 +248,20 @@ void NebulaStore::addSpace(GraphSpaceID spaceId) { } -void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) { +void NebulaStore::addPart(GraphSpaceID spaceId, + PartitionID partId, + bool asLearner, + const std::vector& peers) { folly::RWSpinLock::WriteHolder wh(&lock_); auto spaceIt = this->spaces_.find(spaceId); CHECK(spaceIt != this->spaces_.end()) << "Space should exist!"; - if (spaceIt->second->parts_.find(partId) != spaceIt->second->parts_.end()) { - LOG(INFO) << "[" << spaceId << "," << partId << "] has existed!"; + auto partIt = spaceIt->second->parts_.find(partId); + if (partIt != spaceIt->second->parts_.end()) { + LOG(INFO) << "[Space: " << spaceId << ", Part: " << partId << "] has existed!"; + if (!peers.empty()) { + LOG(INFO) << "[Space: " << spaceId << ", Part: " << partId << "] check peers..."; + partIt->second->checkAndResetPeers(peers); + } return; } @@ -275,7 +283,7 @@ void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearn targetEngine->addPart(partId); spaceIt->second->parts_.emplace( partId, - newPart(spaceId, partId, targetEngine.get(), asLearner)); + newPart(spaceId, partId, targetEngine.get(), asLearner, peers)); LOG(INFO) << "Space " << spaceId << ", part " << partId << " has been added, asLearner " << asLearner; } @@ -283,7 +291,8 @@ void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearn std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, PartitionID partId, KVEngine* engine, - bool asLearner) { + bool asLearner, + const std::vector& defaultPeers) { auto part = std::make_shared(spaceId, partId, raftAddr_, @@ -295,20 +304,29 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, bgWorkers_, workers_, snapshot_); - auto metaStatus = options_.partMan_->partMeta(spaceId, partId); - if (!metaStatus.ok()) { - LOG(ERROR) << "options_.partMan_->partMeta(spaceId, partId); error: " - << metaStatus.status().toString() - << " spaceId: " << spaceId << ", partId: " << partId; - return nullptr; - } - - auto partMeta = metaStatus.value(); std::vector peers; - for (auto& h : partMeta.peers_) { - if (h != storeSvcAddr_) { - peers.emplace_back(getRaftAddr(h)); - VLOG(1) << "Add peer " << peers.back(); + if (defaultPeers.empty()) { + // pull the information from meta + auto metaStatus = options_.partMan_->partMeta(spaceId, partId); + if (!metaStatus.ok()) { + LOG(ERROR) << "options_.partMan_->partMeta(spaceId, partId); error: " + << metaStatus.status().toString() + << " spaceId: " << spaceId << ", partId: " << partId; + return nullptr; + } + + auto partMeta = metaStatus.value(); + for (auto& h : partMeta.peers_) { + if (h != storeSvcAddr_) { + peers.emplace_back(getRaftAddr(h)); + VLOG(1) << "Add peer " << peers.back(); + } + } + } else { + for (auto& h : defaultPeers) { + if (h != raftAddr_) { + peers.emplace_back(h); + } } } raftService_->addPartition(part); diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index e04c6d87fab..52b487f0431 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -210,7 +210,10 @@ class NebulaStore : public KVStore, public Handler { * */ void addSpace(GraphSpaceID spaceId) override; - void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) override; + void addPart(GraphSpaceID spaceId, + PartitionID partId, + bool asLearner, + const std::vector& peers = {}) override; void removeSpace(GraphSpaceID spaceId) override; @@ -229,7 +232,8 @@ class NebulaStore : public KVStore, public Handler { std::shared_ptr newPart(GraphSpaceID spaceId, PartitionID partId, KVEngine* engine, - bool asLearner); + bool asLearner, + const std::vector& defaultPeers); ErrorOr engine(GraphSpaceID spaceId, PartitionID partId); diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 8c392cb8fc4..d2cd242a56d 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -277,7 +277,9 @@ bool Part::commitLogs(std::unique_ptr iter) { if (ts > startTimeMs_) { commitTransLeader(newLeader); } else { - LOG(INFO) << idStr_ << "Skip commit stale transfer leader " << newLeader; + LOG(INFO) << idStr_ << "Skip commit stale transfer leader " << newLeader + << ", the part is opened at " << startTimeMs_ + << ", but the log timestamp is " << ts; } break; } @@ -287,7 +289,9 @@ bool Part::commitLogs(std::unique_ptr iter) { if (ts > startTimeMs_) { commitRemovePeer(peer); } else { - LOG(INFO) << idStr_ << "Skip commit stale remove peer " << peer; + LOG(INFO) << idStr_ << "Skip commit stale remove peer " << peer + << ", the part is opened at " << startTimeMs_ + << ", but the log timestamp is " << ts; } break; } @@ -364,7 +368,9 @@ bool Part::preProcessLog(LogID logId, LOG(INFO) << idStr_ << "preprocess add learner " << learner; addLearner(learner); } else { - LOG(INFO) << idStr_ << "Skip stale add learner " << learner; + LOG(INFO) << idStr_ << "Skip stale add learner " << learner + << ", the part is opened at " << startTimeMs_ + << ", but the log timestamp is " << ts; } break; } @@ -375,7 +381,9 @@ bool Part::preProcessLog(LogID logId, LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader; preProcessTransLeader(newLeader); } else { - LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader; + LOG(INFO) << idStr_ << "Skip stale transfer leader " << newLeader + << ", the part is opened at " << startTimeMs_ + << ", but the log timestamp is " << ts; } break; } @@ -386,7 +394,9 @@ bool Part::preProcessLog(LogID logId, LOG(INFO) << idStr_ << "preprocess add peer " << peer; addPeer(peer); } else { - LOG(INFO) << idStr_ << "Skip stale add peer " << peer; + LOG(INFO) << idStr_ << "Skip stale add peer " << peer + << ", the part is opened at " << startTimeMs_ + << ", but the log timestamp is " << ts; } break; } @@ -397,7 +407,9 @@ bool Part::preProcessLog(LogID logId, LOG(INFO) << idStr_ << "preprocess remove peer " << peer; preProcessRemovePeer(peer); } else { - LOG(INFO) << idStr_ << "Skip stale remove peer " << peer; + LOG(INFO) << idStr_ << "Skip stale remove peer " << peer + << ", the part is opened at " << startTimeMs_ + << ", but the log timestamp is " << ts; } break; } diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 3bdf20e8acd..c685cc6efa4 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -142,7 +142,7 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated( void MetaServerBasedPartManager::onPartAdded(const PartMeta& partMeta) { if (handler_ != nullptr) { - handler_->addPart(partMeta.spaceId_, partMeta.partId_, false); + handler_->addPart(partMeta.spaceId_, partMeta.partId_, false, {}); } else { VLOG(1) << "handler_ is nullptr!"; } diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 993b1316f99..5fe3218b4f5 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -18,13 +18,22 @@ namespace kvstore { class Handler { public: virtual ~Handler() = default; + virtual void addSpace(GraphSpaceID spaceId) = 0; - virtual void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) = 0; + + virtual void addPart(GraphSpaceID spaceId, + PartitionID partId, + bool asLearner, + const std::vector& peers) = 0; + virtual void updateSpaceOption(GraphSpaceID spaceId, const std::unordered_map& options, bool isDbOption) = 0; + virtual void removeSpace(GraphSpaceID spaceId) = 0; + virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0; + virtual int32_t allLeader(std::unordered_map>& leaderIds) = 0; }; @@ -105,7 +114,7 @@ class MemPartManager final : public PartManager { handler_->addSpace(spaceId); } if (noPart && handler_) { - handler_->addPart(spaceId, partId, false); + handler_->addPart(spaceId, partId, false, {}); } } diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 972133ca57b..f670c392dc1 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1060,6 +1060,10 @@ typename RaftPart::Role RaftPart::processElectionResponses( << ", double my election interval."; uint64_t curWeight = weight_.load(); weight_.store(curWeight * 2); + } else { + LOG(ERROR) << idStr_ << "Receive response about askForVote from " + << hosts[r.first]->address() + << ", error code is " << static_cast(r.second.get_error_code()); } } @@ -1619,7 +1623,7 @@ cpp2::ErrorCode RaftPart::verifyLeader( return h->address() == candidate; }); if (it == hosts.end()) { - VLOG(2) << idStr_ << "The candidate leader " << candidate << " is not my peers"; + LOG(INFO) << idStr_ << "The candidate leader " << candidate << " is not my peers"; return cpp2::ErrorCode::E_WRONG_LEADER; } diff --git a/src/kvstore/raftex/test/MemberChangeTest.cpp b/src/kvstore/raftex/test/MemberChangeTest.cpp index 99294b2bd06..cb0e43bec4a 100644 --- a/src/kvstore/raftex/test/MemberChangeTest.cpp +++ b/src/kvstore/raftex/test/MemberChangeTest.cpp @@ -59,7 +59,8 @@ TEST(MemberChangeTest, AddRemovePeerTest) { LOG(INFO) << "Add the same peer again!"; auto f = leader->sendCommandAsync(test::encodeAddPeer(allHosts[3])); f.wait(); - + // sleep a while to ensure the learner receive the command. + sleep(1); for (auto& c : copies) { CHECK_EQ(3, c->hosts_.size()); } @@ -68,7 +69,8 @@ TEST(MemberChangeTest, AddRemovePeerTest) { LOG(INFO) << "Remove the peer added!"; auto f = leader->sendCommandAsync(test::encodeRemovePeer(allHosts[3])); f.wait(); - + // sleep a while to ensure the learner receive the command. + sleep(1); for (size_t i = 0; i < copies.size() - 1; i++) { CHECK_EQ(2, copies[i]->hosts_.size()); } diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index 0cc89620d04..cededefacff 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -74,6 +74,17 @@ folly::Future AdminClient::addPart(GraphSpaceID spaceId, req.set_space_id(spaceId); req.set_part_id(partId); req.set_as_learner(asLearner); + auto ret = getPeers(spaceId, partId); + if (!ret.ok()) { + return ret.status(); + } + auto peers = std::move(ret).value(); + std::vector thriftPeers; + thriftPeers.resize(peers.size()); + std::transform(peers.begin(), peers.end(), thriftPeers.begin(), [this](const auto& h) { + return toThriftHost(h); + }); + req.set_peers(std::move(thriftPeers)); return getResponse(host, std::move(req), [] (auto client, auto request) { return client->future_addPart(request); }, [] (auto&& resp) -> Status { @@ -267,13 +278,17 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID auto fut = pro.getFuture(); std::vector> futures; for (auto& p : peers) { + if (!ActiveHostsMan::isLived(kv_, p)) { + LOG(INFO) << "[" << spaceId << ":" << partId << "], Skip the dead host " << p; + continue; + } auto f = getResponse(p, req, [] (auto client, auto request) { return client->future_checkPeers(request); }, [] (auto&& resp) -> Status { if (resp.get_code() == storage::cpp2::ErrorCode::SUCCEEDED) { return Status::OK(); } else { - return Status::Error("Add part failed! code=%d", + return Status::Error("Check peers failed! code=%d", static_cast(resp.get_code())); } }); @@ -316,12 +331,16 @@ folly::Future AdminClient::getResponse( this] () mutable { auto client = clientsMan_->client(host, evb); remoteFunc(client, std::move(req)).via(evb) - .then([p = std::move(pro), partId, respGen = std::move(respGen)]( + .then([p = std::move(pro), partId, respGen = std::move(respGen), host]( folly::Try&& t) mutable { // exception occurred during RPC + auto hostStr = network::NetworkUtils::intToIPv4(host.first); if (t.hasException()) { - p.setValue(Status::Error(folly::stringPrintf("RPC failure in AdminClient: %s", - t.exception().what().c_str()))); + p.setValue(Status::Error(folly::stringPrintf( + "[%s:%d] RPC failure in AdminClient: %s", + hostStr.c_str(), + host.second, + t.exception().what().c_str()))); return; } auto&& result = std::move(t).value().get_result(); diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp index 7993a5e557e..6ec75a7e22e 100644 --- a/src/meta/processors/admin/BalancePlan.cpp +++ b/src/meta/processors/admin/BalancePlan.cpp @@ -43,6 +43,10 @@ void BalancePlan::dispatchTasks() { void BalancePlan::invoke() { status_ = Status::IN_PROGRESS; + // Sort the tasks by its id to ensure the order after recovery. + std::sort(tasks_.begin(), tasks_.end(), [](auto& l, auto& r) { + return l.taskIdStr() < r.taskIdStr(); + }); dispatchTasks(); for (size_t i = 0; i < buckets_.size(); i++) { for (size_t j = 0; j < buckets_[i].size(); j++) { diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp index bbf69c11924..fcff4000dbb 100644 --- a/src/meta/processors/admin/BalanceTask.cpp +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -43,10 +43,20 @@ void BalanceTask::invoke() { } switch (status_) { case Status::START: { - LOG(INFO) << taskIdStr_ << "Start to move part!"; - status_ = Status::CHANGE_LEADER; + LOG(INFO) << taskIdStr_ << "Start to move part, check the peers firstly!"; ret_ = Result::IN_PROGRESS; startTimeMs_ = time::WallClock::fastNowInMilliSec(); + SAVE_STATE(); + client_->checkPeers(spaceId_, partId_).thenValue([this] (auto&& resp) { + if (!resp.ok()) { + LOG(INFO) << taskIdStr_ << "Check the peers failed, status " << resp; + ret_ = Result::FAILED; + } else { + status_ = Status::CHANGE_LEADER; + } + invoke(); + }); + break; } // fallthrough case Status::CHANGE_LEADER: { diff --git a/src/storage/admin/AdminProcessor.h b/src/storage/admin/AdminProcessor.h index 2720c1cf965..19a7a9f074c 100644 --- a/src/storage/admin/AdminProcessor.h +++ b/src/storage/admin/AdminProcessor.h @@ -138,7 +138,12 @@ class AddPartProcessor : public BaseProcessor { LOG(INFO) << "Space " << spaceId << " not exist, create it!"; store->addSpace(spaceId); } - store->addPart(spaceId, partId, req.get_as_learner()); + std::vector peers; + for (auto& p : req.get_peers()) { + peers.emplace_back( + kvstore::NebulaStore::getRaftAddr(HostAddr(p.get_ip(), p.get_port()))); + } + store->addPart(spaceId, partId, req.get_as_learner(), peers); onFinished(); } From 3420d77de3f5d595b003ec1486e31f8c94d43af7 Mon Sep 17 00:00:00 2001 From: heng Date: Fri, 31 Jul 2020 13:54:03 +0800 Subject: [PATCH 2/2] Fix bug about catchup --- src/kvstore/raftex/RaftPart.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index f670c392dc1..5699625fbe4 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -1837,7 +1837,8 @@ AppendLogResult RaftPart::isCatchedUp(const HostAddr& peer) { } for (auto& host : hosts_) { if (host->addr_ == peer) { - if (host->followerCommittedLogId_ < wal_->firstLogId()) { + if (host->followerCommittedLogId_ == 0 + || host->followerCommittedLogId_ < wal_->firstLogId()) { LOG(INFO) << idStr_ << "The committed log id of peer is " << host->followerCommittedLogId_ << ", which is invalid or less than my first wal log id";