From fe82c25413e092ebff358f631df007a9fcfa4b16 Mon Sep 17 00:00:00 2001 From: heng Date: Fri, 24 May 2019 14:40:51 +0800 Subject: [PATCH] Implement balance logic in meta --- src/common/time/TimeUtils.h | 1 + src/interface/meta.thrift | 15 ++ src/interface/storage.thrift | 24 +++ src/meta/CMakeLists.txt | 7 +- src/meta/MetaServiceHandler.cpp | 6 + src/meta/MetaServiceHandler.h | 3 + src/meta/client/MetaClient.cpp | 9 + src/meta/client/MetaClient.h | 4 + src/meta/processors/BaseProcessor.h | 21 +-- src/meta/processors/Common.h | 39 ++++ src/meta/processors/admin/AdminClient.cpp | 96 ++++++++++ src/meta/processors/admin/AdminClient.h | 69 +++++++ src/meta/processors/admin/BalancePlan.cpp | 55 ++++++ src/meta/processors/admin/BalancePlan.h | 53 ++++++ .../processors/admin/BalanceProcessor.cpp | 49 +++++ src/meta/processors/admin/BalanceProcessor.h | 34 ++++ src/meta/processors/admin/BalanceTask.cpp | 137 ++++++++++++++ src/meta/processors/admin/BalanceTask.h | 82 +++++++++ src/meta/processors/admin/Balancer.cpp | 170 ++++++++++++++++++ src/meta/processors/admin/Balancer.h | 92 ++++++++++ src/meta/processors/admin/HBProcessor.cpp | 1 + src/meta/test/BalancerTest.cpp | 102 +++++++++++ src/meta/test/CMakeLists.txt | 31 +++- 23 files changed, 1078 insertions(+), 22 deletions(-) create mode 100644 src/meta/processors/Common.h create mode 100644 src/meta/processors/admin/AdminClient.cpp create mode 100644 src/meta/processors/admin/AdminClient.h create mode 100644 src/meta/processors/admin/BalancePlan.cpp create mode 100644 src/meta/processors/admin/BalancePlan.h create mode 100644 src/meta/processors/admin/BalanceProcessor.cpp create mode 100644 src/meta/processors/admin/BalanceProcessor.h create mode 100644 src/meta/processors/admin/BalanceTask.cpp create mode 100644 src/meta/processors/admin/BalanceTask.h create mode 100644 src/meta/processors/admin/Balancer.cpp create mode 100644 src/meta/processors/admin/Balancer.h create mode 100644 src/meta/test/BalancerTest.cpp diff --git a/src/common/time/TimeUtils.h b/src/common/time/TimeUtils.h index 387134923cb..66fde102e92 100644 --- a/src/common/time/TimeUtils.h +++ b/src/common/time/TimeUtils.h @@ -7,6 +7,7 @@ #ifndef COMMON_TIME_TIMEUTILS_H_ #define COMMON_TIME_TIMEUTILS_H_ +#include "base/Base.h" #include namespace nebula { diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index a980f44148d..02a1710c243 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -26,6 +26,7 @@ enum ErrorCode { E_NOT_FOUND = -23, E_INVALID_HOST = -24, E_UNSUPPORTED = -25, + E_BALANCER_RUNNING = -26, // KV Failure E_STORE_FAILURE = -31, @@ -311,6 +312,19 @@ struct HBReq { 1: common.HostAddr host, } +struct BalanceReq { + 1: optional common.GraphSpaceID space_id, + // Specify the balance id to check the status of the related balance plan + 2: optional i64 id, +} + +struct BalanceResp { + 1: ErrorCode code, + 2: i64 id, + // Valid if code equals E_LEADER_CHANGED. + 3: common.HostAddr leader, +} + service MetaService { ExecResp createSpace(1: CreateSpaceReq req); ExecResp dropSpace(1: DropSpaceReq req); @@ -343,5 +357,6 @@ service MetaService { ScanResp scan(1: ScanReq req); HBResp heartBeat(1: HBReq req); + BalanceResp balance(1: BalanceReq req); } diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index fafa4cf21d0..486fde9c96f 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -160,6 +160,25 @@ struct AddEdgesRequest { 3: bool overwritable, } +struct AdminExecResp { + +} + +struct AddPartReq { + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, +} + +struct RemovePartReq { + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, +} + +struct MemberChangeReq { + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, +} + service StorageService { QueryResponse getOutBound(1: GetNeighborsRequest req) QueryResponse getInBound(1: GetNeighborsRequest req) @@ -173,5 +192,10 @@ service StorageService { ExecResponse addVertices(1: AddVerticesRequest req); ExecResponse addEdges(1: AddEdgesRequest req); + + // Interfaces for admin operations + AdminExecResp addPart(1: AddPartReq req); + AdminExecResp removePart(1: RemovePartReq req); + AdminExecResp memberChange(1: MemberChangeReq req); } diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 6f0e1fcf61f..c04788d81f7 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -37,7 +37,13 @@ add_library( processors/customKV/RemoveRangeProcessor.cpp processors/customKV/ScanProcessor.cpp processors/admin/HBProcessor.cpp + processors/admin/BalanceProcessor.cpp + processors/admin/Balancer.cpp + processors/admin/BalancePlan.cpp + processors/admin/BalanceTask.cpp + processors/admin/AdminClient.cpp ) + add_dependencies( meta_service_handler base_obj @@ -68,5 +74,4 @@ add_dependencies( common_thrift_obj schema_obj) - add_subdirectory(test) diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 8eea9834799..b5125273fe4 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -31,6 +31,7 @@ #include "meta/processors/customKV/RemoveProcessor.h" #include "meta/processors/customKV/RemoveRangeProcessor.h" #include "meta/processors/admin/HBProcessor.h" +#include "meta/processors/admin/BalanceProcessor.h" #define RETURN_FUTURE(processor) \ auto f = processor->getFuture(); \ @@ -190,5 +191,10 @@ MetaServiceHandler::future_heartBeat(const cpp2::HBReq& req) { RETURN_FUTURE(processor); } +folly::Future +MetaServiceHandler::future_balance(const cpp2::BalanceReq& req) { + auto* processor = BalanceProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} } // namespace meta } // namespace nebula diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 9f7b0130d74..dcb698ac233 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -107,6 +107,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_heartBeat(const cpp2::HBReq& req) override; + folly::Future + future_balance(const cpp2::BalanceReq& req) override; + private: kvstore::KVStore* kvstore_ = nullptr; }; diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 949775b5e79..6b5233222eb 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -934,5 +934,14 @@ folly::Future> MetaClient::heartbeat() { return resp.code == cpp2::ErrorCode::SUCCEEDED; }, true); } + +folly::Future> MetaClient::balance() { + cpp2::BalanceReq req; + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_balance(request); + }, [] (cpp2::BalanceResp&& resp) -> int64_t { + return resp.id; + }, true); +} } // namespace meta } // namespace nebula diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 886df0a025c..663e303cdce 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -157,6 +157,10 @@ class MetaClient { folly::Future> removeRange(std::string segment, std::string start, std::string end); + // Operations for admin + folly::Future> + balance(); + // Opeartions for cache. StatusOr getSpaceIdByNameFromCache(const std::string& name); diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index 3dcccc83f9b..00d563fb797 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -17,32 +17,13 @@ #include "meta/MetaServiceUtils.h" #include "meta/common/MetaCommon.h" #include "network/NetworkUtils.h" +#include "meta/processors/Common.h" namespace nebula { namespace meta { using nebula::network::NetworkUtils; -const PartitionID kDefaultPartId = 0; -const GraphSpaceID kDefaultSpaceId = 0; - -class LockUtils { -public: - LockUtils() = delete; -#define GENERATE_LOCK(Entry) \ - static folly::SharedMutex& Entry##Lock() { \ - static folly::SharedMutex l; \ - return l; \ - } - -GENERATE_LOCK(space); -GENERATE_LOCK(id); -GENERATE_LOCK(tag); -GENERATE_LOCK(edge); - -#undef GENERATE_LOCK -}; - #define CHECK_SPACE_ID_AND_RETURN(spaceID) \ if (spaceExist(spaceID) == Status::SpaceNotFound()) { \ resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \ diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h new file mode 100644 index 00000000000..f8e81fe3181 --- /dev/null +++ b/src/meta/processors/Common.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_PROCESSORS_COMMON_H_ +#define META_PROCESSORS_COMMON_H_ + +#include "base/Base.h" + +namespace nebula { +namespace meta { + +static const PartitionID kDefaultPartId = 0; +static const GraphSpaceID kDefaultSpaceId = 0; + +class LockUtils { +public: + LockUtils() = delete; +#define GENERATE_LOCK(Entry) \ + static folly::SharedMutex& Entry##Lock() { \ + static folly::SharedMutex l; \ + return l; \ + } + +GENERATE_LOCK(space); +GENERATE_LOCK(id); +GENERATE_LOCK(tag); +GENERATE_LOCK(edge); + +#undef GENERATE_LOCK +}; + + +} // namespace meta +} // namespace nebula +#endif // META_PROCESSORS_COMMON_H_ + diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp new file mode 100644 index 00000000000..1014893b7b6 --- /dev/null +++ b/src/meta/processors/admin/AdminClient.cpp @@ -0,0 +1,96 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/admin/AdminClient.h" + +namespace nebula { +namespace meta { + +folly::Future AdminClient::transLeader(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst) { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(leader); + UNUSED(dst); + if (injector_) { + return injector_->transLeader(); + } + return Status::OK(); +} + +folly::Future AdminClient::addPart(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& host, + bool asLearner) { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(host); + UNUSED(asLearner); + if (injector_) { + return injector_->addPart(); + } + return Status::OK(); +} + +folly::Future AdminClient::addLearner(GraphSpaceID spaceId, PartitionID partId) { + UNUSED(spaceId); + UNUSED(partId); + if (injector_) { + return injector_->addLearner(); + } + return Status::OK(); +} + +folly::Future AdminClient::waitingForCatchUpData(GraphSpaceID spaceId, + PartitionID partId) { + UNUSED(spaceId); + UNUSED(partId); + if (injector_) { + return injector_->waitingForCatchUpData(); + } + return Status::OK(); +} + +folly::Future AdminClient::memberChange(GraphSpaceID spaceId, PartitionID partId) { + UNUSED(spaceId); + UNUSED(partId); + if (injector_) { + return injector_->memberChange(); + } + return Status::OK(); +} + +folly::Future AdminClient::updateMeta(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst) { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(leader); + UNUSED(dst); + if (injector_) { + return injector_->updateMeta(); + } + return Status::OK(); +} + +folly::Future AdminClient::removePart(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& host) { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(host); + if (injector_) { + return injector_->removePart(); + } + return Status::OK(); +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h new file mode 100644 index 00000000000..47758d7c051 --- /dev/null +++ b/src/meta/processors/admin/AdminClient.h @@ -0,0 +1,69 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_ +#define META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_ + +#include "base/Base.h" +#include +#include "base/Status.h" +#include "thrift/ThriftClientManager.h" + +namespace nebula { +namespace meta { + +class FaultInjector { +public: + virtual Status transLeader() = 0; + virtual Status addPart() = 0; + virtual Status addLearner() = 0; + virtual Status waitingForCatchUpData() = 0; + virtual Status memberChange() = 0; + virtual Status updateMeta() = 0; + virtual Status removePart() = 0; +}; + +class AdminClient { +public: + AdminClient() = default; + + explicit AdminClient(std::unique_ptr injector) + : injector_(std::move(injector)) {} + + ~AdminClient() = default; + + folly::Future transLeader(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst); + + folly::Future addPart(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& host, + bool asLearner); + + folly::Future addLearner(GraphSpaceID spaceId, PartitionID partId); + + folly::Future waitingForCatchUpData(GraphSpaceID spaceId, PartitionID partId); + + folly::Future memberChange(GraphSpaceID spaceId, PartitionID partId); + + folly::Future updateMeta(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst); + + folly::Future removePart(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& host); + +private: + std::unique_ptr injector_{nullptr}; +}; +} // namespace meta +} // namespace nebula + +#endif // META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_ diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp new file mode 100644 index 00000000000..75f3037f612 --- /dev/null +++ b/src/meta/processors/admin/BalancePlan.cpp @@ -0,0 +1,55 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/admin/BalancePlan.h" + +namespace nebula { +namespace meta { + +void BalancePlan::invoke() { + // TODO(heng) we want tasks for the same part to be invoked serially. + for (auto& task : tasks_) { + task.invoke(); + } +} + +void BalancePlan::registerTaskCb() { + for (auto& task : tasks_) { + task.onFinished_ = [this]() { + bool finished = false; + { + std::lock_guard lg(lock_); + finishedTaskNum_++; + if (finishedTaskNum_ == tasks_.size()) { + finished = true; + } + } + if (finished) { + onFinished_(); + } + }; + task.onError_ = [this]() { + bool finished = false; + { + std::lock_guard lg(lock_); + finishedTaskNum_++; + if (finishedTaskNum_ == tasks_.size()) { + finished = true; + } + } + if (finished) { + onFinished_(); + } + }; + } +} + +void BalancePlan::saveInStore() { +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/BalancePlan.h b/src/meta/processors/admin/BalancePlan.h new file mode 100644 index 00000000000..0d19e867895 --- /dev/null +++ b/src/meta/processors/admin/BalancePlan.h @@ -0,0 +1,53 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_ADMIN_BALANCEPLAN_H_ +#define META_ADMIN_BALANCEPLAN_H_ + +#include +#include "kvstore/KVStore.h" +#include "meta/processors/admin/BalanceTask.h" + +namespace nebula { +namespace meta { + +class BalancePlan { + friend class Balancer; +public: + explicit BalancePlan(int64_t id, kvstore::KVStore* kv) + : id_(id) + , kv_(kv) {} + + void addTask(BalanceTask task) { + tasks_.emplace_back(std::move(task)); + } + + /** + * The method should be called after add all tasks into plan. + * */ + void registerTaskCb(); + + void invoke(); + + void saveInStore(); + + int64_t id() const { + return id_; + } + +private: + int64_t id_ = 0; + kvstore::KVStore* kv_ = nullptr; + std::vector tasks_; + std::mutex lock_; + size_t finishedTaskNum_; + std::function onFinished_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_ADMIN_BALANCEPLAN_H_ diff --git a/src/meta/processors/admin/BalanceProcessor.cpp b/src/meta/processors/admin/BalanceProcessor.cpp new file mode 100644 index 00000000000..93cd9ce9826 --- /dev/null +++ b/src/meta/processors/admin/BalanceProcessor.cpp @@ -0,0 +1,49 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + + +#include "meta/processors/admin/BalanceProcessor.h" +#include "meta/processors/admin/Balancer.h" + +namespace nebula { +namespace meta { + +void BalanceProcessor::process(const cpp2::BalanceReq& req) { + if (req.get_space_id() != nullptr) { + LOG(ERROR) << "Unsupport balance for specal space " << *req.get_space_id(); + resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED); + onFinished(); + return; + } + if (req.get_id() != nullptr) { + LOG(ERROR) << "Unsupport show status for special balance plan, id=" << *req.get_id(); + resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED); + onFinished(); + return; + } + auto hosts = ActiveHostsMan::instance()->getActiveHosts(); + if (hosts.empty()) { + LOG(ERROR) << "There is no active hosts"; + resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS); + onFinished(); + return; + } + auto ret = Balancer::instance(kvstore_)->balance(); + if (!ret.ok()) { + LOG(INFO) << "The balancer is running."; + resp_.set_code(cpp2::ErrorCode::E_BALANCER_RUNNING); + onFinished(); + return; + } + resp_.set_id(ret.value()); + resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + onFinished(); +} + + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/BalanceProcessor.h b/src/meta/processors/admin/BalanceProcessor.h new file mode 100644 index 00000000000..aaea85fcc9c --- /dev/null +++ b/src/meta/processors/admin/BalanceProcessor.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_BALANCEPROCESSOR_H_ +#define META_BALANCEPROCESSOR_H_ + +#include +#include "meta/processors/BaseProcessor.h" +#include "meta/ActiveHostsMan.h" +#include "time/TimeUtils.h" + +namespace nebula { +namespace meta { + +class BalanceProcessor : public BaseProcessor { +public: + static BalanceProcessor* instance(kvstore::KVStore* kvstore) { + return new BalanceProcessor(kvstore); + } + + void process(const cpp2::BalanceReq& req); + +private: + explicit BalanceProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_BALANCEPROCESSOR_H_ diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp new file mode 100644 index 00000000000..5f1681c50a8 --- /dev/null +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -0,0 +1,137 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/admin/BalanceTask.h" + +namespace nebula { +namespace meta { + +void BalanceTask::invoke() { + if (ret_ == Result::FAILED) { + saveInStore(); + onError_(); + return; + } + switch (status_) { + case Status::START: { + LOG(INFO) << taskId_ << "Start to move part!"; + status_ = Status::CHANGE_LEADER; + ret_ = Result::IN_PROGRESS; + startTimeMs_ = time::TimeUtils::nowInSeconds(); + invoke(); + break; + } + case Status::CHANGE_LEADER: { + LOG(INFO) << taskId_ << "Ask the src to give up the leadership."; + saveInStore(); + client_->transLeader(spaceId_, partId_, src_, dst_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::ADD_PART_ON_DST; + } + invoke(); + }); + break; + } + case Status::ADD_PART_ON_DST: { + LOG(INFO) << taskId_ << "Open the part as leaner on dst."; + saveInStore(); + client_->addPart(spaceId_, partId_, dst_, true).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::ADD_LEARNER; + } + invoke(); + }); + break; + } + case Status::ADD_LEARNER: { + LOG(INFO) << taskId_ << "Add learner dst."; + saveInStore(); + client_->addLearner(spaceId_, partId_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::CATCH_UP_DATA; + } + invoke(); + }); + break; + } + case Status::CATCH_UP_DATA: { + LOG(INFO) << taskId_ << "Waiting for the data catch up."; + saveInStore(); + client_->waitingForCatchUpData(spaceId_, partId_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::MEMBER_CHANGE; + } + invoke(); + }); + break; + } + case Status::MEMBER_CHANGE: { + LOG(INFO) << taskId_ << "Send member change request to the leader" + << ", it will add the new member on dst host" + << " and remove the old member on src host."; + saveInStore(); + client_->memberChange(spaceId_, partId_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::UPDATE_PART_META; + } + invoke(); + }); + break; + } + case Status::UPDATE_PART_META: { + LOG(INFO) << taskId_ << "Update meta for part."; + saveInStore(); + client_->updateMeta(spaceId_, partId_, src_, dst_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::REMOVE_PART_ON_SRC; + } + invoke(); + }); + break; + } + case Status::REMOVE_PART_ON_SRC: { + LOG(INFO) << taskId_ << "Close part on src host."; + saveInStore(); + client_->removePart(spaceId_, partId_, src_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + ret_ = Result::SUCCECCED; + status_ = Status::END; + } + invoke(); + }); + break; + } + case Status::END: { + LOG(INFO) << taskId_ << "Part has been moved successfully!"; + endTimeMs_ = time::TimeUtils::nowInSeconds(); + saveInStore(); + onFinished_(); + break; + } + } + return; +} + +void BalanceTask::saveInStore() { +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/BalanceTask.h b/src/meta/processors/admin/BalanceTask.h new file mode 100644 index 00000000000..ea95d715343 --- /dev/null +++ b/src/meta/processors/admin/BalanceTask.h @@ -0,0 +1,82 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_ADMIN_BALANCETASK_H_ +#define META_ADMIN_BALANCETASK_H_ + +#include +#include "meta/ActiveHostsMan.h" +#include "time/TimeUtils.h" +#include "kvstore/KVStore.h" +#include "network/NetworkUtils.h" +#include "meta/processors/admin/AdminClient.h" + +namespace nebula { +namespace meta { + +class BalanceTask { + friend class BalancePlan; + FRIEND_TEST(BalanceTaskTest, SimpleTest); + +public: + BalanceTask(PartitionID partId, + const HostAddr& src, + const HostAddr& dst, + kvstore::KVStore* kv) + : partId_(partId) + , src_(src) + , dst_(dst) + , taskId_(folly::stringPrintf("[%d,%s:%d->%s:%d] ", + partId, + network::NetworkUtils::intToIPv4(src.first).c_str(), + src.second, + network::NetworkUtils::intToIPv4(dst.first).c_str(), + dst.second)) + , kv_(kv) {} + + void invoke(); + +private: + void saveInStore(); + +private: + enum class Status : uint8_t { + START = 0x01, + CHANGE_LEADER = 0x02, + ADD_PART_ON_DST = 0x03, + ADD_LEARNER = 0x04, + CATCH_UP_DATA = 0x05, + MEMBER_CHANGE = 0x06, + UPDATE_PART_META = 0x07, + REMOVE_PART_ON_SRC = 0x08, + END = 0xFF, + }; + + enum class Result : uint8_t { + SUCCECCED = 0x01, + FAILED = 0x02, + IN_PROGRESS = 0x03, + }; + + GraphSpaceID spaceId_; + PartitionID partId_; + HostAddr src_; + HostAddr dst_; + std::string taskId_; + kvstore::KVStore* kv_ = nullptr; + AdminClient* client_ = nullptr; + Status status_ = Status::START; + Result ret_ = Result::IN_PROGRESS; + int64_t startTimeMs_ = 0; + int64_t endTimeMs_ = 0; + std::function onFinished_; + std::function onError_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_ADMIN_BALANCETASK_H_ diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp new file mode 100644 index 00000000000..55e69bd0e30 --- /dev/null +++ b/src/meta/processors/admin/Balancer.cpp @@ -0,0 +1,170 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/admin/Balancer.h" +#include "meta/processors/Common.h" +#include "meta/ActiveHostsMan.h" +#include "meta/MetaServiceUtils.h" + +namespace nebula { +namespace meta { + +StatusOr Balancer::balance() { + bool expected = false; + if (running_.compare_exchange_strong(expected, true)) { + plan_ = buildBalancePlan(); + if (plan_ == nullptr) { + return Status::Error("balance failed"); + } + bgThread_.addTask(&BalancePlan::invoke, plan_.get()); + return plan_->id(); + } + return Status::Error("balance running"); +} + +bool Balancer::recovery() { + // TODO(heng) recovery the balance plan from kvstore. + return true; +} + +std::unique_ptr Balancer::buildBalancePlan() { + std::vector spaces; + { + // Get all spaces + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + auto prefix = MetaServiceUtils::spacePrefix(); + std::unique_ptr iter; + auto ret = kv_->prefix(0, 0, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Rocksdb failed?"; + running_ = false; + return nullptr; + } + while (iter->valid()) { + auto spaceId = MetaServiceUtils::spaceId(iter->key()); + spaces.push_back(spaceId); + iter->next(); + } + } + auto plan = std::make_unique(time::TimeUtils::nowInSeconds(), kv_); + for (auto spaceId : spaces) { + auto tasks = genTasks(spaceId); + for (auto& task : tasks) { + plan->addTask(std::move(task)); + } + } + plan->onFinished_ = [this] () { + bool expected = true; + CHECK(running_.compare_exchange_strong(expected, false)); + }; + plan->registerTaskCb(); + plan->saveInStore(); + return plan; +} + +std::vector Balancer::genTasks(GraphSpaceID spaceId) { + std::unordered_map> hostParts; + int32_t totalParts = 0; + getHostParts(spaceId, hostParts, totalParts); + if (totalParts == 0 || hostParts.empty()) { + LOG(ERROR) << "Invalid space " << spaceId; + return std::vector(); + } + auto activeHosts = ActiveHostsMan::instance()->getActiveHosts(); + std::vector newlyAdded; + std::vector lost; + calDiff(hostParts, activeHosts, newlyAdded, lost); + decltype(hostParts) newHostParts(hostParts); + for (auto& h : newlyAdded) { + newHostParts.emplace(h, std::vector()); + } + for (auto& h : lost) { + newHostParts.erase(h); + } + LOG(INFO) << "Now, try to balance the newHostParts"; + float avgLoad = static_cast(totalParts)/newHostParts.size(); + LOG(INFO) << "The expect avg load is " << avgLoad << " for space " << spaceId; + // We have two parts need to balance, the first one is parts on lost hosts + // The seconds one is parts on unbalanced host in newHostParts. + std::vector tasks; + for (auto& h : lost) { + auto& lostParts = hostParts[h]; + for (auto& partId : lostParts) { + auto ret = pickupHost(newHostParts, partId); + if (!ret.ok()) { + LOG(ERROR) << "Error:" << ret.status(); + return std::vector(); + } + auto& luckyHost = ret.value(); + newHostParts[luckyHost].emplace_back(partId); + tasks.emplace_back(partId, h, luckyHost, kv_); + } + } + // TODO(heng): balance the unbalanced host. + return tasks; +} + +void Balancer::getHostParts(GraphSpaceID spaceId, + std::unordered_map>& hostParts, + int32_t& totalParts) { + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + auto prefix = MetaServiceUtils::partPrefix(spaceId); + std::unique_ptr iter; + auto ret = kv_->prefix(0, 0, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId; + return; + } + while (iter->valid()) { + auto key = iter->key(); + PartitionID partId; + memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); + auto partHosts = MetaServiceUtils::parsePartVal(iter->val()); + for (auto& ph : partHosts) { + hostParts[HostAddr(ph.ip, ph.port)].emplace_back(partId); + } + iter->next(); + totalParts++; + } +} + +void Balancer::calDiff(const std::unordered_map>& hostParts, + const std::vector& activeHosts, + std::vector& newlyAdded, + std::vector& lost) { + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + if (std::find(activeHosts.begin(), activeHosts.end(), it->first) == activeHosts.end()) { + lost.emplace_back(it->first); + } + } + for (auto& h : activeHosts) { + if (hostParts.find(h) == hostParts.end()) { + newlyAdded.emplace_back(h); + } + } +} + +StatusOr Balancer::pickupHost( + const std::unordered_map>& hostParts, + PartitionID partId) { + std::vector> hosts; + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) { + hosts.emplace_back(it->first, it->second.size()); + } + } + if (hosts.empty()) { + return Status::Error("No host is suitable for %d", partId); + } + std::sort(hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { + return l.second < r.second; + }); + return hosts[0].first; +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h new file mode 100644 index 00000000000..b8e9ad20b02 --- /dev/null +++ b/src/meta/processors/admin/Balancer.h @@ -0,0 +1,92 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_ADMIN_BALANCERR_H_ +#define META_ADMIN_BALANCERR_H_ + +#include +#include "time/TimeUtils.h" +#include "kvstore/KVStore.h" +#include "network/NetworkUtils.h" +#include "meta/processors/admin/AdminClient.h" +#include "meta/processors/admin/BalanceTask.h" +#include "meta/processors/admin/BalancePlan.h" + +namespace nebula { +namespace meta { +/** +1. Balance will generate balance plan according to current active hosts and parts allocation +2. For the plan, we hope after moving the least parts , it will reach a reasonable state. +3. Only one balance plan could be invoked at the same time. +4. Each balance plan has one id, and we could show the status by "balance id" command + and after FO, we could go on the balance plan by type "balance id" +5. Each balance plan contains many balance tasks, the task represents the minimum movement unit. +6. We save the whole balancePlan state in kvstore to do failover. +7. Each balance task contains serval steps. And it should be executed step by step. +8. One task failed will result in the whole balance plan failed. +9. urrently, we hope tasks for the same part could be invoked serially + * */ +class Balancer { +public: + static Balancer* instance(kvstore::KVStore* kv) { + static std::unique_ptr balancer(new Balancer(kv)); + static std::once_flag initFlag; + std::call_once(initFlag, [&]() { + CHECK(balancer->recovery()); + }); + return balancer.get(); + } + + /* + * Return Error if reject the balance request, otherwise return balance id. + * */ + StatusOr balance(); + + /* + * When the balancer failover, we should recovery the status. + * */ + bool recovery(); + +private: + explicit Balancer(kvstore::KVStore* kv) + : kv_(kv) {} + + /* + * Execute the balance plan. The method is ruuning in bgThread_. + */ + void invoke(BalancePlan plan); + + /** + * Build balance plan and save it in kvstore. + * */ + std::unique_ptr buildBalancePlan(); + + std::vector genTasks(GraphSpaceID spaceId); + + void getHostParts(GraphSpaceID spaceId, + std::unordered_map>& hostParts, + int32_t& totalParts); + + void calDiff(const std::unordered_map>& hostParts, + const std::vector& activeHosts, + std::vector& newlyAdded, + std::vector& lost); + + StatusOr pickupHost( + const std::unordered_map>& hostParts, + PartitionID partId); + +private: + std::atomic_bool running_{false}; + kvstore::KVStore* kv_ = nullptr; + thread::GenericWorker bgThread_; + std::unique_ptr plan_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_ADMIN_BALANCERR_H_ diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index 54b7b61e565..5388323d7a1 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -32,6 +32,7 @@ void HBProcessor::process(const cpp2::HBReq& req) { HostInfo info; info.lastHBTimeInSec_ = time::TimeUtils::nowInSeconds(); ActiveHostsMan::instance()->updateHostInfo(host, info); + resp_.set_code(cpp2::ErrorCode::SUCCEEDED); onFinished(); } diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp new file mode 100644 index 00000000000..f4c9e30315f --- /dev/null +++ b/src/meta/test/BalancerTest.cpp @@ -0,0 +1,102 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "base/Base.h" +#include +#include "meta/processors/admin/Balancer.h" + +namespace nebula { +namespace meta { + +class TestFaultInjector : public FaultInjector { +public: + explicit TestFaultInjector(std::vector sts) + : statusArray_(std::move(sts)) {} + + Status transLeader() override { + return statusArray_[0]; + } + + Status addPart() override { + return statusArray_[1]; + } + + Status addLearner() override { + return statusArray_[2]; + } + + Status waitingForCatchUpData() override { + return statusArray_[3]; + } + + Status memberChange() override { + return statusArray_[4]; + } + + Status updateMeta() override { + return statusArray_[5]; + } + + Status removePart() override { + return statusArray_[6]; + } + +private: + std::vector statusArray_; +}; + +TEST(BalanceTaskTest, SimpleTest) { + { + std::vector sts(7, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + BalanceTask task(0, HostAddr(0, 0), HostAddr(1, 1), nullptr); + task.onFinished_ = []() { + LOG(INFO) << "Task finished!"; + }; + task.onError_ = []() { + LOG(FATAL) << "We should not reach here!"; + }; + task.client_ = client.get(); + task.invoke(); + ASSERT_EQ(BalanceTask::Result::SUCCECCED, task.ret_); + ASSERT_EQ(BalanceTask::Status::END, task.status_); + } + { + std::vector sts{Status::Error("transLeader failed!"), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK()}; + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + BalanceTask task(0, HostAddr(0, 0), HostAddr(1, 1), nullptr); + task.onFinished_ = []() { + LOG(FATAL) << "We should not reach here!"; + }; + task.onError_ = []() { + LOG(INFO) << "Error happens!"; + }; + task.client_ = client.get(); + task.invoke(); + ASSERT_EQ(BalanceTask::Result::FAILED, task.ret_); + ASSERT_EQ(BalanceTask::Status::CHANGE_LEADER, task.status_); + } +} + +} // namespace meta +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 6b1dd211b61..2357ca663e9 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -137,7 +137,6 @@ nebula_link_libraries( ) nebula_add_test(active_hosts_man_test) - add_executable( meta_http_test MetaHttpHandlerTest.cpp @@ -171,3 +170,33 @@ nebula_link_libraries( gtest ) nebula_add_test(meta_http_test) + +add_executable( + balancer_test + BalancerTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + balancer_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + gtest +) + +nebula_add_test(balancer_test) +