diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 155339b31b0..83d12bc7a3a 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -27,6 +27,7 @@ enum ErrorCode { E_INVALID_HOST = -24, E_UNSUPPORTED = -25, E_NOT_DROP = -26, + E_BALANCER_RUNNING = -27, // KV Failure E_STORE_FAILURE = -31, @@ -417,6 +418,19 @@ struct CheckPasswordReq { 2: string encoded_pwd, } +struct BalanceReq { + 1: optional common.GraphSpaceID space_id, + // Specify the balance id to check the status of the related balance plan + 2: optional i64 id, +} + +struct BalanceResp { + 1: ErrorCode code, + 2: i64 id, + // Valid if code equals E_LEADER_CHANGED. + 3: common.HostAddr leader, +} + service MetaService { ExecResp createSpace(1: CreateSpaceReq req); ExecResp dropSpace(1: DropSpaceReq req); @@ -448,8 +462,6 @@ service MetaService { ExecResp removeRange(1: RemoveRangeReq req); ScanResp scan(1: ScanReq req); - HBResp heartBeat(1: HBReq req); - ExecResp createUser(1: CreateUserReq req); ExecResp dropUser(1: DropUserReq req); ExecResp alterUser(1: AlterUserReq req); @@ -461,5 +473,7 @@ service MetaService { ExecResp changePassword(1: ChangePasswordReq req); ExecResp checkPassword(1: CheckPasswordReq req); + HBResp heartBeat(1: HBReq req); + BalanceResp balance(1: BalanceReq req); } diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index fafa4cf21d0..486fde9c96f 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -160,6 +160,25 @@ struct AddEdgesRequest { 3: bool overwritable, } +struct AdminExecResp { + +} + +struct AddPartReq { + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, +} + +struct RemovePartReq { + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, +} + +struct MemberChangeReq { + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, +} + service StorageService { QueryResponse getOutBound(1: GetNeighborsRequest req) QueryResponse getInBound(1: GetNeighborsRequest req) @@ -173,5 +192,10 @@ service StorageService { ExecResponse addVertices(1: AddVerticesRequest req); ExecResponse addEdges(1: AddEdgesRequest req); + + // Interfaces for admin operations + AdminExecResp addPart(1: AddPartReq req); + AdminExecResp removePart(1: RemovePartReq req); + AdminExecResp memberChange(1: MemberChangeReq req); } diff --git a/src/kvstore/test/CMakeLists.txt b/src/kvstore/test/CMakeLists.txt index e5b9f0f974c..84cf515526f 100644 --- a/src/kvstore/test/CMakeLists.txt +++ b/src/kvstore/test/CMakeLists.txt @@ -171,5 +171,5 @@ nebula_link_libraries( boost_regex ) -nebula_add_test(multi_versions_perf_test_bm) +# nebula_add_test(multi_versions_perf_test_bm) diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 7d35f54134f..1ebfbeaaab6 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -38,7 +38,13 @@ add_library( processors/customKV/ScanProcessor.cpp processors/admin/HBProcessor.cpp processors/usersMan/AuthenticationProcessor.cpp + processors/admin/BalanceProcessor.cpp + processors/admin/Balancer.cpp + processors/admin/BalancePlan.cpp + processors/admin/BalanceTask.cpp + processors/admin/AdminClient.cpp ) + add_dependencies( meta_service_handler base_obj @@ -69,5 +75,4 @@ add_dependencies( common_thrift_obj schema_obj) - add_subdirectory(test) diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index c3cde610ef0..64cec63742c 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -32,6 +32,7 @@ #include "meta/processors/customKV/RemoveRangeProcessor.h" #include "meta/processors/admin/HBProcessor.h" #include "meta/processors/usersMan/AuthenticationProcessor.h" +#include "meta/processors/admin/BalanceProcessor.h" #define RETURN_FUTURE(processor) \ auto f = processor->getFuture(); \ @@ -251,5 +252,10 @@ MetaServiceHandler::future_checkPassword(const cpp2::CheckPasswordReq& req) { RETURN_FUTURE(processor); } +folly::Future +MetaServiceHandler::future_balance(const cpp2::BalanceReq& req) { + auto* processor = BalanceProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} } // namespace meta } // namespace nebula diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index b679da93340..7d78a75ae7f 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -101,12 +101,6 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_listEdges(const cpp2::ListEdgesReq& req) override; - /** - * HeartBeat - * */ - folly::Future - future_heartBeat(const cpp2::HBReq& req) override; - /** * User manager **/ @@ -140,6 +134,15 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_checkPassword(const cpp2::CheckPasswordReq& req) override; + /** + * HeartBeat + * */ + folly::Future + future_heartBeat(const cpp2::HBReq& req) override; + + folly::Future + future_balance(const cpp2::BalanceReq& req) override; + private: kvstore::KVStore* kvstore_ = nullptr; }; diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index ded55153426..7663f2c5e66 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -20,6 +20,7 @@ const std::string kIndexTable = "__index__"; // NOLINT const std::string kUsersTable = "__users__"; // NOLINT const std::string kRolesTable = "__roles__"; // NOLINT + const std::string kHostOnline = "Online"; // NOLINT const std::string kHostOffline = "Offline"; // NOLINT diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 2e9fe2feaaa..206946ed09d 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -966,5 +966,14 @@ folly::Future> MetaClient::heartbeat() { return resp.code == cpp2::ErrorCode::SUCCEEDED; }, true); } + +folly::Future> MetaClient::balance() { + cpp2::BalanceReq req; + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_balance(request); + }, [] (cpp2::BalanceResp&& resp) -> int64_t { + return resp.id; + }, true); +} } // namespace meta } // namespace nebula diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index bafcc713d8b..e469c9b2628 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -159,7 +159,11 @@ class MetaClient { folly::Future> removeRange(std::string segment, std::string start, std::string end); - // Operations for cache. + // Operations for admin + folly::Future> + balance(); + + // Opeartions for cache. StatusOr getSpaceIdByNameFromCache(const std::string& name); StatusOr getTagIDByNameFromCache(const GraphSpaceID& space, const std::string& name); diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index 1ed61ef49bd..c000d2bcce3 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -17,33 +17,13 @@ #include "meta/MetaServiceUtils.h" #include "meta/common/MetaCommon.h" #include "network/NetworkUtils.h" +#include "meta/processors/Common.h" namespace nebula { namespace meta { using nebula::network::NetworkUtils; -const PartitionID kDefaultPartId = 0; -const GraphSpaceID kDefaultSpaceId = 0; - -class LockUtils { -public: - LockUtils() = delete; -#define GENERATE_LOCK(Entry) \ - static folly::SharedMutex& Entry##Lock() { \ - static folly::SharedMutex l; \ - return l; \ - } - -GENERATE_LOCK(space); -GENERATE_LOCK(id); -GENERATE_LOCK(tag); -GENERATE_LOCK(edge); -GENERATE_LOCK(user); - -#undef GENERATE_LOCK -}; - #define CHECK_SPACE_ID_AND_RETURN(spaceID) \ if (spaceExist(spaceID) == Status::SpaceNotFound()) { \ resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \ diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h new file mode 100644 index 00000000000..9dcda6a4263 --- /dev/null +++ b/src/meta/processors/Common.h @@ -0,0 +1,42 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_PROCESSORS_COMMON_H_ +#define META_PROCESSORS_COMMON_H_ + +#include "base/Base.h" + +namespace nebula { +namespace meta { + +static const PartitionID kDefaultPartId = 0; +static const GraphSpaceID kDefaultSpaceId = 0; + +using BalanceID = int64_t; + +class LockUtils { +public: + LockUtils() = delete; +#define GENERATE_LOCK(Entry) \ + static folly::SharedMutex& Entry##Lock() { \ + static folly::SharedMutex l; \ + return l; \ + } + +GENERATE_LOCK(space); +GENERATE_LOCK(id); +GENERATE_LOCK(tag); +GENERATE_LOCK(edge); +GENERATE_LOCK(user); + +#undef GENERATE_LOCK +}; + + +} // namespace meta +} // namespace nebula +#endif // META_PROCESSORS_COMMON_H_ + diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp new file mode 100644 index 00000000000..1014893b7b6 --- /dev/null +++ b/src/meta/processors/admin/AdminClient.cpp @@ -0,0 +1,96 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/admin/AdminClient.h" + +namespace nebula { +namespace meta { + +folly::Future AdminClient::transLeader(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst) { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(leader); + UNUSED(dst); + if (injector_) { + return injector_->transLeader(); + } + return Status::OK(); +} + +folly::Future AdminClient::addPart(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& host, + bool asLearner) { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(host); + UNUSED(asLearner); + if (injector_) { + return injector_->addPart(); + } + return Status::OK(); +} + +folly::Future AdminClient::addLearner(GraphSpaceID spaceId, PartitionID partId) { + UNUSED(spaceId); + UNUSED(partId); + if (injector_) { + return injector_->addLearner(); + } + return Status::OK(); +} + +folly::Future AdminClient::waitingForCatchUpData(GraphSpaceID spaceId, + PartitionID partId) { + UNUSED(spaceId); + UNUSED(partId); + if (injector_) { + return injector_->waitingForCatchUpData(); + } + return Status::OK(); +} + +folly::Future AdminClient::memberChange(GraphSpaceID spaceId, PartitionID partId) { + UNUSED(spaceId); + UNUSED(partId); + if (injector_) { + return injector_->memberChange(); + } + return Status::OK(); +} + +folly::Future AdminClient::updateMeta(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst) { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(leader); + UNUSED(dst); + if (injector_) { + return injector_->updateMeta(); + } + return Status::OK(); +} + +folly::Future AdminClient::removePart(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& host) { + UNUSED(spaceId); + UNUSED(partId); + UNUSED(host); + if (injector_) { + return injector_->removePart(); + } + return Status::OK(); +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h new file mode 100644 index 00000000000..cd0e3f60df4 --- /dev/null +++ b/src/meta/processors/admin/AdminClient.h @@ -0,0 +1,74 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_ +#define META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_ + +#include "base/Base.h" +#include +#include "base/Status.h" +#include "thrift/ThriftClientManager.h" + +namespace nebula { +namespace meta { + +class FaultInjector { +public: + virtual ~FaultInjector() = default; + virtual folly::Future transLeader() = 0; + virtual folly::Future addPart() = 0; + virtual folly::Future addLearner() = 0; + virtual folly::Future waitingForCatchUpData() = 0; + virtual folly::Future memberChange() = 0; + virtual folly::Future updateMeta() = 0; + virtual folly::Future removePart() = 0; +}; + +class AdminClient { +public: + AdminClient() = default; + + explicit AdminClient(std::unique_ptr injector) + : injector_(std::move(injector)) {} + + ~AdminClient() = default; + + folly::Future transLeader(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst); + + folly::Future addPart(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& host, + bool asLearner); + + folly::Future addLearner(GraphSpaceID spaceId, PartitionID partId); + + folly::Future waitingForCatchUpData(GraphSpaceID spaceId, PartitionID partId); + + folly::Future memberChange(GraphSpaceID spaceId, PartitionID partId); + + folly::Future updateMeta(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst); + + folly::Future removePart(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& host); + + FaultInjector* faultInjector() { + return injector_.get(); + } + +private: + std::unique_ptr injector_{nullptr}; +}; +} // namespace meta +} // namespace nebula + +#endif // META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_ diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp new file mode 100644 index 00000000000..58b92858cb3 --- /dev/null +++ b/src/meta/processors/admin/BalancePlan.cpp @@ -0,0 +1,198 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/admin/BalancePlan.h" +#include +#include "meta/processors/Common.h" + +DEFINE_uint32(task_concurrency, 10, "The tasks number could be invoked simultaneously"); + +namespace nebula { +namespace meta { + +const std::string kBalancePlanTable = "__b_plan__"; // NOLINT + +void BalancePlan::dispatchTasks() { + // Key -> spaceID + partID, Val -> List of task index in tasks_; + std::unordered_map, std::vector> partTasks; + int32_t index = 0; + for (auto& task : tasks_) { + partTasks[std::make_pair(task.spaceId_, task.partId_)].emplace_back(index++); + } + buckets_.resize(std::min(partTasks.size(), (size_t)FLAGS_task_concurrency)); + for (auto it = partTasks.begin(); it != partTasks.end(); it++) { + size_t minNum = tasks_.size(); + int32_t i = 0, minIndex = 0; + for (auto& bucket : buckets_) { + if (bucket.size() < minNum) { + minNum = bucket.size(); + minIndex = i; + } + i++; + } + for (auto taskIndex : it->second) { + buckets_[minIndex].emplace_back(taskIndex); + } + } +} + +void BalancePlan::invoke() { + status_ = Status::IN_PROGRESS; + dispatchTasks(); + for (size_t i = 0; i < buckets_.size(); i++) { + for (size_t j = 0; j < buckets_[i].size(); j++) { + auto taskIndex = buckets_[i][j]; + tasks_[taskIndex].onFinished_ = [this, i, j]() { + bool finished = false; + { + std::lock_guard lg(lock_); + finishedTaskNum_++; + if (finishedTaskNum_ == tasks_.size()) { + finished = true; + if (status_ == Status::IN_PROGRESS) { + status_ = Status::SUCCEEDED; + } + } + } + if (finished) { + CHECK_EQ(j, this->buckets_[i].size() - 1); + saveInStore(true); + onFinished_(); + } else { + if (j + 1 < this->buckets_[i].size()) { + auto& task = this->tasks_[this->buckets_[i][j + 1]]; + task.invoke(); + } + } + }; // onFinished + tasks_[taskIndex].onError_ = [this, i, j]() { + bool finished = false; + { + std::lock_guard lg(lock_); + finishedTaskNum_++; + status_ = Status::FAILED; + if (finishedTaskNum_ == tasks_.size()) { + finished = true; + } + } + if (finished) { + CHECK_EQ(j, this->buckets_[i].size() - 1); + saveInStore(true); + onFinished_(); + } else { + if (j + 1 < this->buckets_[i].size()) { + auto& task = this->tasks_[this->buckets_[i][j + 1]]; + task.invoke(); + } + } + }; // onError + } // for (auto j = 0; j < buckets_[i].size(); j++) + } // for (auto i = 0; i < buckets_.size(); i++) + + for (auto& bucket : buckets_) { + if (!bucket.empty()) { + tasks_[bucket[0]].invoke(); + } + } + saveInStore(true); +} + +bool BalancePlan::saveInStore(bool onlyPlan) { + if (kv_) { + std::vector data; + data.emplace_back(planKey(), planVal()); + if (!onlyPlan) { + for (auto& task : tasks_) { + data.emplace_back(task.taskKey(), task.taskVal()); + } + } + folly::Baton baton; + bool ret = false; + kv_->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, + std::move(data), + [this, &baton, &ret] (kvstore::ResultCode code) { + if (kvstore::ResultCode::SUCCEEDED == code) { + ret = true; + } else { + LOG(ERROR) << "Can't write the kvstore, ret = " << static_cast(code); + } + baton.post(); + }); + baton.wait(); + return ret; + } + return true; +} + +bool BalancePlan::recovery() { + if (kv_) { + const auto& prefix = BalanceTask::prefix(id_); + std::unique_ptr iter; + auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Can't access kvstore, ret = " << static_cast(ret); + return false; + } + while (iter->valid()) { + BalanceTask task; + task.kv_ = kv_; + task.client_ = client_; + { + auto tup = BalanceTask::parseKey(iter->key()); + task.balanceId_ = std::get<0>(tup); + task.spaceId_ = std::get<1>(tup); + task.partId_ = std::get<2>(tup); + task.src_ = std::get<3>(tup); + task.dst_ = std::get<4>(tup); + } + { + auto tup = BalanceTask::parseVal(iter->val()); + task.status_ = std::get<0>(tup); + task.ret_ = std::get<1>(tup); + if (task.ret_ == BalanceTask::Result::FAILED) { + // Resume the failed task. + task.ret_ = BalanceTask::Result::IN_PROGRESS; + } + task.startTimeMs_ = std::get<2>(tup); + task.endTimeMs_ = std::get<3>(tup); + } + tasks_.emplace_back(std::move(task)); + iter->next(); + } + } + return true; +} + +std::string BalancePlan::planKey() const { + std::string str; + str.reserve(48); + str.append(reinterpret_cast(kBalancePlanTable.data()), kBalancePlanTable.size()); + str.append(reinterpret_cast(&id_), sizeof(id_)); + return str; +} + +std::string BalancePlan::planVal() const { + std::string str; + str.append(reinterpret_cast(&status_), sizeof(status_)); + return str; +} + +const std::string& BalancePlan::prefix() { + return kBalancePlanTable; +} + +BalanceID BalancePlan::id(const folly::StringPiece& rawKey) { + return *reinterpret_cast(rawKey.begin() + kBalancePlanTable.size()); +} + +BalancePlan::Status BalancePlan::status(const folly::StringPiece& rawVal) { + return static_cast(*rawVal.begin()); +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/BalancePlan.h b/src/meta/processors/admin/BalancePlan.h new file mode 100644 index 00000000000..f52b031c960 --- /dev/null +++ b/src/meta/processors/admin/BalancePlan.h @@ -0,0 +1,95 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_ADMIN_BALANCEPLAN_H_ +#define META_ADMIN_BALANCEPLAN_H_ + +#include +#include "kvstore/KVStore.h" +#include "meta/processors/admin/BalanceTask.h" + +namespace nebula { +namespace meta { + +class BalancePlan { + friend class Balancer; + FRIEND_TEST(BalanceTest, BalancePlanTest); + FRIEND_TEST(BalanceTest, NormalTest); + FRIEND_TEST(BalanceTest, RecoveryTest); + FRIEND_TEST(BalanceTest, DispatchTasksTest); + +public: + enum class Status : uint8_t { + NOT_START = 0x01, + IN_PROGRESS = 0x02, + SUCCEEDED = 0x03, + /** + * TODO(heng): Currently, after the plan failed, we will try to resume it + * when running "balance" again. But in many cases, the plan will be failed + * forever, it this cases, we should rollback the plan. + * */ + FAILED = 0x04, + }; + + BalancePlan(BalanceID id, kvstore::KVStore* kv, AdminClient* client) + : id_(id) + , kv_(kv) + , client_(client) {} + + void addTask(BalanceTask task) { + tasks_.emplace_back(std::move(task)); + } + + void invoke(); + + /** + * TODO(heng): How to rollback if the some tasks failed. + * For the tasks before UPDATE_META, they will go back to the original state before balance. + * For the tasks after UPDATE_META, they will go on until succeeded. + * NOTES: update_meta should be an atomic operation. There is no middle state inside. + * */ + void rollback() {} + + bool saveInStore(bool onlyPlan = false); + + BalanceID id() const { + return id_; + } + +private: + bool recovery(); + + std::string planKey() const; + + std::string planVal() const; + + void dispatchTasks(); + + static const std::string& prefix(); + + static BalanceID id(const folly::StringPiece& rawKey); + + static BalancePlan::Status status(const folly::StringPiece& rawVal); + +private: + BalanceID id_ = 0; + kvstore::KVStore* kv_ = nullptr; + AdminClient* client_ = nullptr; + std::vector tasks_; + std::mutex lock_; + size_t finishedTaskNum_ = 0; + std::function onFinished_; + Status status_ = Status::NOT_START; + + // List of task index in tasks_; + using Bucket = std::vector; + std::vector buckets_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_ADMIN_BALANCEPLAN_H_ diff --git a/src/meta/processors/admin/BalanceProcessor.cpp b/src/meta/processors/admin/BalanceProcessor.cpp new file mode 100644 index 00000000000..56e27b6c37b --- /dev/null +++ b/src/meta/processors/admin/BalanceProcessor.cpp @@ -0,0 +1,49 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + + +#include "meta/processors/admin/BalanceProcessor.h" +#include "meta/processors/admin/Balancer.h" + +namespace nebula { +namespace meta { + +void BalanceProcessor::process(const cpp2::BalanceReq& req) { + if (req.get_space_id() != nullptr) { + LOG(ERROR) << "Unsupport balance for specific space " << *req.get_space_id(); + resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED); + onFinished(); + return; + } + if (req.get_id() != nullptr) { + LOG(ERROR) << "Unsupport show status for specific balance plan, id=" << *req.get_id(); + resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED); + onFinished(); + return; + } + auto hosts = ActiveHostsMan::instance()->getActiveHosts(); + if (hosts.empty()) { + LOG(ERROR) << "There is no active hosts"; + resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS); + onFinished(); + return; + } + auto ret = Balancer::instance(kvstore_)->balance(); + if (!ret.ok()) { + LOG(INFO) << "The balancer is running."; + resp_.set_code(cpp2::ErrorCode::E_BALANCER_RUNNING); + onFinished(); + return; + } + resp_.set_id(ret.value()); + resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + onFinished(); +} + + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/BalanceProcessor.h b/src/meta/processors/admin/BalanceProcessor.h new file mode 100644 index 00000000000..82fb712b89b --- /dev/null +++ b/src/meta/processors/admin/BalanceProcessor.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_BALANCEPROCESSOR_H_ +#define META_BALANCEPROCESSOR_H_ + +#include +#include "meta/processors/BaseProcessor.h" +#include "meta/ActiveHostsMan.h" + +namespace nebula { +namespace meta { + +class BalanceProcessor : public BaseProcessor { +public: + static BalanceProcessor* instance(kvstore::KVStore* kvstore) { + return new BalanceProcessor(kvstore); + } + + void process(const cpp2::BalanceReq& req); + +private: + explicit BalanceProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_BALANCEPROCESSOR_H_ diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp new file mode 100644 index 00000000000..38a5949edac --- /dev/null +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -0,0 +1,234 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/admin/BalanceTask.h" +#include +#include "meta/processors/Common.h" + +namespace nebula { +namespace meta { + +#define SAVE_STATE() \ + if (!saveInStore()) { \ + ret_ = Result::FAILED; \ + onError_(); \ + return; \ + } + +const std::string kBalanceTaskTable = "__b_task__"; // NOLINT + +void BalanceTask::invoke() { + CHECK_NOTNULL(client_); + if (ret_ == Result::FAILED) { + endTimeMs_ = time::WallClock::fastNowInSec(); + saveInStore(); + onError_(); + return; + } + switch (status_) { + case Status::START: { + LOG(INFO) << taskIdStr_ << "Start to move part!"; + status_ = Status::CHANGE_LEADER; + ret_ = Result::IN_PROGRESS; + startTimeMs_ = time::WallClock::fastNowInSec(); + } + case Status::CHANGE_LEADER: { + LOG(INFO) << taskIdStr_ << "Ask the src to give up the leadership."; + SAVE_STATE(); + client_->transLeader(spaceId_, partId_, src_, dst_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::ADD_PART_ON_DST; + } + invoke(); + }); + break; + } + case Status::ADD_PART_ON_DST: { + LOG(INFO) << taskIdStr_ << "Open the part as learner on dst."; + SAVE_STATE(); + client_->addPart(spaceId_, partId_, dst_, true).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::ADD_LEARNER; + } + invoke(); + }); + break; + } + case Status::ADD_LEARNER: { + LOG(INFO) << taskIdStr_ << "Add learner dst."; + SAVE_STATE(); + client_->addLearner(spaceId_, partId_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::CATCH_UP_DATA; + } + invoke(); + }); + break; + } + case Status::CATCH_UP_DATA: { + LOG(INFO) << taskIdStr_ << "Waiting for the data catch up."; + SAVE_STATE(); + client_->waitingForCatchUpData(spaceId_, partId_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::MEMBER_CHANGE; + } + invoke(); + }); + break; + } + case Status::MEMBER_CHANGE: { + LOG(INFO) << taskIdStr_ << "Send member change request to the leader" + << ", it will add the new member on dst host" + << " and remove the old member on src host."; + SAVE_STATE(); + client_->memberChange(spaceId_, partId_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::UPDATE_PART_META; + } + invoke(); + }); + break; + } + case Status::UPDATE_PART_META: { + LOG(INFO) << taskIdStr_ << "Update meta for part."; + SAVE_STATE(); + client_->updateMeta(spaceId_, partId_, src_, dst_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::REMOVE_PART_ON_SRC; + } + invoke(); + }); + break; + } + case Status::REMOVE_PART_ON_SRC: { + LOG(INFO) << taskIdStr_ << "Close part on src host."; + SAVE_STATE(); + client_->removePart(spaceId_, partId_, src_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + ret_ = Result::SUCCEEDED; + status_ = Status::END; + } + invoke(); + }); + break; + } + case Status::END: { + LOG(INFO) << taskIdStr_ << "Part has been moved successfully!"; + endTimeMs_ = time::WallClock::fastNowInSec(); + SAVE_STATE(); + onFinished_(); + break; + } + } + return; +} + +void BalanceTask::rollback() { + if (status_ < Status::UPDATE_PART_META) { + // TODO(heng): restart the part on its peers. + } else { + // TODO(heng): Go on the task. + } +} + +bool BalanceTask::saveInStore() { + if (kv_) { + std::vector data; + data.emplace_back(taskKey(), taskVal()); + folly::Baton baton; + bool ret = false; + kv_->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, + std::move(data), + [this, &ret, &baton] (kvstore::ResultCode code) { + if (kvstore::ResultCode::SUCCEEDED == code) { + ret = true; + } else { + LOG(INFO) << taskIdStr_ << "Can't persist task!"; + } + baton.post(); + }); + baton.wait(); + return ret; + } + return true; +} + +std::string BalanceTask::taskKey() { + std::string str; + str.reserve(64); + str.append(reinterpret_cast(kBalanceTaskTable.data()), kBalanceTaskTable.size()); + str.append(reinterpret_cast(&balanceId_), sizeof(balanceId_)); + str.append(reinterpret_cast(&spaceId_), sizeof(spaceId_)); + str.append(reinterpret_cast(&partId_), sizeof(partId_)); + str.append(reinterpret_cast(&src_), sizeof(src_)); + str.append(reinterpret_cast(&dst_), sizeof(dst_)); + return str; +} + +std::string BalanceTask::taskVal() { + std::string str; + str.reserve(32); + str.append(reinterpret_cast(&status_), sizeof(status_)); + str.append(reinterpret_cast(&ret_), sizeof(ret_)); + str.append(reinterpret_cast(&startTimeMs_), sizeof(startTimeMs_)); + str.append(reinterpret_cast(&endTimeMs_), sizeof(endTimeMs_)); + return str; +} + +std::string BalanceTask::prefix(BalanceID balanceId) { + std::string str; + str.reserve(32); + str.append(reinterpret_cast(kBalanceTaskTable.data()), kBalanceTaskTable.size()); + str.append(reinterpret_cast(&balanceId), sizeof(balanceId)); + return str; +} + +std::tuple +BalanceTask::parseKey(const folly::StringPiece& rawKey) { + int32_t offset = kBalanceTaskTable.size(); + auto balanceId = *reinterpret_cast(rawKey.begin() + offset); + offset += sizeof(balanceId); + auto spaceId = *reinterpret_cast(rawKey.begin() + offset); + offset += sizeof(GraphSpaceID); + auto partId = *reinterpret_cast(rawKey.begin() + offset); + offset += sizeof(PartitionID); + auto src = *reinterpret_cast(rawKey.begin() + offset); + offset += sizeof(HostAddr); + auto dst = *reinterpret_cast(rawKey.begin() + offset); + return std::make_tuple(balanceId, spaceId, partId, src, dst); +} + +std::tuple +BalanceTask::parseVal(const folly::StringPiece& rawVal) { + int32_t offset = 0; + auto status = *reinterpret_cast(rawVal.begin() + offset); + offset += sizeof(BalanceTask::Status); + auto ret = *reinterpret_cast(rawVal.begin() + offset); + offset += sizeof(BalanceTask::Result); + auto start = *reinterpret_cast(rawVal.begin() + offset); + offset += sizeof(int64_t); + auto end = *reinterpret_cast(rawVal.begin() + offset); + return std::make_tuple(status, ret, start, end); +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/BalanceTask.h b/src/meta/processors/admin/BalanceTask.h new file mode 100644 index 00000000000..e1849979cd4 --- /dev/null +++ b/src/meta/processors/admin/BalanceTask.h @@ -0,0 +1,114 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_ADMIN_BALANCETASK_H_ +#define META_ADMIN_BALANCETASK_H_ + +#include +#include "meta/ActiveHostsMan.h" +#include "time/WallClock.h" +#include "kvstore/KVStore.h" +#include "network/NetworkUtils.h" +#include "meta/processors/admin/AdminClient.h" +#include "meta/processors/Common.h" + +namespace nebula { +namespace meta { + +class BalanceTask { + friend class BalancePlan; + FRIEND_TEST(BalanceTaskTest, SimpleTest); + FRIEND_TEST(BalanceTest, BalancePlanTest); + FRIEND_TEST(BalanceTest, NormalTest); + FRIEND_TEST(BalanceTest, RecoveryTest); + +public: + BalanceTask() = default; + BalanceTask(BalanceID balanceId, + GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& src, + const HostAddr& dst, + kvstore::KVStore* kv, + AdminClient* client) + : balanceId_(balanceId) + , spaceId_(spaceId) + , partId_(partId) + , src_(src) + , dst_(dst) + , taskIdStr_(folly::stringPrintf( + "[%ld, %d, %s:%d->%s:%d] ", + balanceId, + partId, + network::NetworkUtils::intToIPv4(src.first).c_str(), + src.second, + network::NetworkUtils::intToIPv4(dst.first).c_str(), + dst.second)) + , kv_(kv) + , client_(client) {} + + const std::string& taskIdStr() const { + return taskIdStr_; + } + + void invoke(); + + void rollback(); + +private: + enum class Status : uint8_t { + START = 0x01, + CHANGE_LEADER = 0x02, + ADD_PART_ON_DST = 0x03, + ADD_LEARNER = 0x04, + CATCH_UP_DATA = 0x05, + MEMBER_CHANGE = 0x06, + UPDATE_PART_META = 0x07, // After this state, we can't rollback anymore. + REMOVE_PART_ON_SRC = 0x08, + END = 0xFF, + }; + + enum class Result : uint8_t { + SUCCEEDED = 0x01, + FAILED = 0x02, + IN_PROGRESS = 0x03, + }; + + bool saveInStore(); + + std::string taskKey(); + + std::string taskVal(); + + static std::string prefix(BalanceID balanceId); + + static std::tuple + parseKey(const folly::StringPiece& rawKey); + + static std::tuple + parseVal(const folly::StringPiece& rawVal); + +private: + BalanceID balanceId_; + GraphSpaceID spaceId_; + PartitionID partId_; + HostAddr src_; + HostAddr dst_; + std::string taskIdStr_; + kvstore::KVStore* kv_ = nullptr; + AdminClient* client_ = nullptr; + Status status_ = Status::START; + Result ret_ = Result::IN_PROGRESS; + int64_t startTimeMs_ = 0; + int64_t endTimeMs_ = 0; + std::function onFinished_; + std::function onError_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_ADMIN_BALANCETASK_H_ diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp new file mode 100644 index 00000000000..6d5752b1219 --- /dev/null +++ b/src/meta/processors/admin/Balancer.cpp @@ -0,0 +1,316 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/processors/admin/Balancer.h" +#include +#include +#include "meta/processors/Common.h" +#include "meta/ActiveHostsMan.h" +#include "meta/MetaServiceUtils.h" + +namespace nebula { +namespace meta { + +StatusOr Balancer::balance() { + bool expected = false; + if (running_.compare_exchange_strong(expected, true)) { + if (!recovery()) { + LOG(ERROR) << "Recovery balancer failed!"; + return Status::Error("Can't do balance because there is one corruptted balance plan!"); + } + if (plan_ == nullptr) { + LOG(INFO) << "There is no corrupted plan need to recovery, so create a new one"; + auto status = buildBalancePlan(); + if (plan_ == nullptr) { + LOG(ERROR) << "Create balance plan failed!"; + return status; + } + } + executor_->add(std::bind(&BalancePlan::invoke, plan_.get())); + return plan_->id(); + } + return Status::Error("balance running"); +} + +bool Balancer::recovery() { + CHECK(!plan_) << "plan should be nullptr now"; + if (kv_) { + const auto& prefix = BalancePlan::prefix(); + std::unique_ptr iter; + auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Can't access kvstore, ret = " << static_cast(ret); + return false; + } + std::vector corruptedPlans; + while (iter->valid()) { + auto balanceId = BalancePlan::id(iter->key()); + auto status = BalancePlan::status(iter->val()); + if (status == BalancePlan::Status::IN_PROGRESS + || status == BalancePlan::Status::FAILED) { + corruptedPlans.emplace_back(balanceId); + } + iter->next(); + } + if (corruptedPlans.empty()) { + LOG(INFO) << "No corrupted plan need to recovery!"; + return true; + } + CHECK_EQ(1, corruptedPlans.size()); + plan_ = std::make_unique(corruptedPlans[0], kv_, client_.get()); + plan_->onFinished_ = [this] () { + plan_.reset(); + bool expected = true; + CHECK(running_.compare_exchange_strong(expected, false)); + }; + if (!plan_->recovery()) { + LOG(ERROR) << "Can't recovery plan " << corruptedPlans[0]; + plan_->onFinished_(); + return false; + } + } + return true; +} + +Status Balancer::buildBalancePlan() { + CHECK(!plan_) << "plan should be nullptr now"; + std::vector spaces; + { + // Get all spaces + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + auto prefix = MetaServiceUtils::spacePrefix(); + std::unique_ptr iter; + auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + running_ = false; + return Status::Error("Can't access kvstore, ret = %d", static_cast(ret)); + } + while (iter->valid()) { + auto spaceId = MetaServiceUtils::spaceId(iter->key()); + spaces.push_back(spaceId); + iter->next(); + } + } + plan_ = std::make_unique(time::WallClock::fastNowInSec(), kv_, client_.get()); + for (auto spaceId : spaces) { + auto tasks = genTasks(spaceId); + for (auto& task : tasks) { + plan_->addTask(std::move(task)); + } + } + plan_->onFinished_ = [this] () { + plan_.reset(); + bool expected = true; + CHECK(running_.compare_exchange_strong(expected, false)); + }; + if (plan_->tasks_.empty()) { + plan_->onFinished_(); + return Status::Error("No Tasks"); + } + if (!plan_->saveInStore()) { + plan_->onFinished_(); + return Status::Error("Can't persist the plan"); + } + return Status::OK(); +} + +std::vector Balancer::genTasks(GraphSpaceID spaceId) { + CHECK(!!plan_) << "plan should not be nullptr"; + std::unordered_map> hostParts; + int32_t totalParts = 0; + getHostParts(spaceId, hostParts, totalParts); + if (totalParts == 0 || hostParts.empty()) { + LOG(ERROR) << "Invalid space " << spaceId; + return std::vector(); + } + auto activeHosts = ActiveHostsMan::instance()->getActiveHosts(); + std::vector newlyAdded; + std::vector lost; + calDiff(hostParts, activeHosts, newlyAdded, lost); + decltype(hostParts) newHostParts(hostParts); + for (auto& h : newlyAdded) { + newHostParts.emplace(h, std::vector()); + } + for (auto& h : lost) { + newHostParts.erase(h); + } + LOG(INFO) << "Now, try to balance the newHostParts"; + // We have two parts need to balance, the first one is parts on lost hosts + // The seconds one is parts on unbalanced host in newHostParts. + std::vector tasks; + for (auto& h : lost) { + auto& lostParts = hostParts[h]; + for (auto& partId : lostParts) { + auto ret = hostWithMinimalParts(newHostParts, partId); + if (!ret.ok()) { + LOG(ERROR) << "Error:" << ret.status(); + return std::vector(); + } + auto& luckyHost = ret.value(); + newHostParts[luckyHost].emplace_back(partId); + tasks.emplace_back(plan_->id_, + spaceId, + partId, + h, + luckyHost, + kv_, + client_.get()); + } + } + if (newHostParts.size() < 2) { + LOG(INFO) << "Too few hosts, no need for balance!"; + return tasks; + } + balanceParts(plan_->id_, spaceId, newHostParts, totalParts, tasks); + return tasks; +} + +void Balancer::balanceParts(BalanceID balanceId, + GraphSpaceID spaceId, + std::unordered_map>& newHostParts, + int32_t totalParts, + std::vector& tasks) { + float avgLoad = static_cast(totalParts)/newHostParts.size(); + LOG(INFO) << "The expect avg load is " << avgLoad; + int32_t minLoad = std::floor(avgLoad); + int32_t maxLoad = std::ceil(avgLoad); + auto hosts = sortedHostsByParts(newHostParts); + CHECK_GT(hosts.size(), 1); + auto maxPartsHost = hosts.back(); + auto minPartsHost = hosts.front(); + auto lastDelta = maxPartsHost.second - minPartsHost.second + 1; + while (maxPartsHost.second > maxLoad + || minPartsHost.second < minLoad + || maxPartsHost.second - minPartsHost.second < lastDelta) { + CHECK_GE(maxPartsHost.second, avgLoad); + CHECK_GE(avgLoad, minPartsHost.second); + auto& partsFrom = newHostParts[maxPartsHost.first]; + auto& partsTo = newHostParts[minPartsHost.first]; + VLOG(1) << maxPartsHost.first << ":" << partsFrom.size() + << "->" << minPartsHost.first << ":" << partsTo.size() + << ", lastDelta=" << lastDelta; + std::vector diff; + std::set_difference(partsFrom.begin(), partsFrom.end(), partsTo.begin(), partsTo.end(), + std::inserter(diff, diff.begin())); + bool noAction = true; + for (auto& partId : diff) { + if (partsFrom.size() <= partsTo.size() + 1 + || partsFrom.size() <= (size_t)minLoad + || partsTo.size() >= (size_t)maxLoad) { + VLOG(1) << "No need to move any parts from " + << maxPartsHost.first << " to " << minPartsHost.first; + break; + } + VLOG(1) << maxPartsHost.first << "->" << minPartsHost.first << ": " << partId; + auto it = std::find(partsFrom.begin(), partsFrom.end(), partId); + CHECK(it != partsFrom.end()); + partsFrom.erase(it); + partsTo.emplace_back(partId); + tasks.emplace_back(balanceId, + spaceId, + partId, + maxPartsHost.first, + minPartsHost.first, + kv_, + client_.get()); + noAction = false; + } + if (noAction) { + break; + } + lastDelta = maxPartsHost.second - minPartsHost.second; + hosts = sortedHostsByParts(newHostParts); + maxPartsHost = hosts.back(); + minPartsHost = hosts.front(); + } + LOG(INFO) << "Balance tasks num: " << tasks.size(); + for (auto& task : tasks) { + LOG(INFO) << task.taskIdStr(); + } +} + +void Balancer::getHostParts(GraphSpaceID spaceId, + std::unordered_map>& hostParts, + int32_t& totalParts) { + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + auto prefix = MetaServiceUtils::partPrefix(spaceId); + std::unique_ptr iter; + auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId; + return; + } + while (iter->valid()) { + auto key = iter->key(); + PartitionID partId; + memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); + auto partHosts = MetaServiceUtils::parsePartVal(iter->val()); + for (auto& ph : partHosts) { + hostParts[HostAddr(ph.ip, ph.port)].emplace_back(partId); + } + iter->next(); + totalParts++; + } + auto key = MetaServiceUtils::spaceKey(spaceId); + std::string value; + auto code = kv_->get(kDefaultSpaceId, kDefaultPartId, key, &value); + if (code != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId; + return; + } + auto properties = MetaServiceUtils::parseSpace(value); + CHECK_EQ(totalParts, properties.get_partition_num()); + totalParts *= properties.get_replica_factor(); +} + +void Balancer::calDiff(const std::unordered_map>& hostParts, + const std::vector& activeHosts, + std::vector& newlyAdded, + std::vector& lost) { + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + VLOG(3) << "Host " << it->first << ", parts " << it->second.size(); + if (std::find(activeHosts.begin(), activeHosts.end(), it->first) == activeHosts.end()) { + lost.emplace_back(it->first); + } + } + for (auto& h : activeHosts) { + VLOG(3) << "Actvie Host " << h; + if (hostParts.find(h) == hostParts.end()) { + newlyAdded.emplace_back(h); + } + } +} + +std::vector> +Balancer::sortedHostsByParts(const std::unordered_map>& hostParts) { + std::vector> hosts; + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + hosts.emplace_back(it->first, it->second.size()); + } + std::sort(hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { + return l.second < r.second; + }); + return hosts; +} + +StatusOr Balancer::hostWithMinimalParts( + const std::unordered_map>& hostParts, + PartitionID partId) { + auto hosts = sortedHostsByParts(hostParts); + for (auto& h : hosts) { + auto it = hostParts.find(h.first); + CHECK(it != hostParts.end()); + if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) { + return h.first; + } + } + return Status::Error("No host is suitable for %d", partId); +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h new file mode 100644 index 00000000000..1e906688690 --- /dev/null +++ b/src/meta/processors/admin/Balancer.h @@ -0,0 +1,142 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_ADMIN_BALANCERR_H_ +#define META_ADMIN_BALANCERR_H_ + +#include +#include +#include "kvstore/KVStore.h" +#include "network/NetworkUtils.h" +#include "time/WallClock.h" +#include "meta/processors/admin/AdminClient.h" +#include "meta/processors/admin/BalanceTask.h" +#include "meta/processors/admin/BalancePlan.h" + +namespace nebula { +namespace meta { +/** +There are two interfaces public: + * Balance: it will construct a balance plan and invoked it. If last balance plan is not succeeded, it will + * try to resume it. + * + * Rollback: In many cases, if some plan failed forever, we call this interface to rollback. + +Some notes: +1. Balance will generate balance plan according to current active hosts and parts allocation +2. For the plan, we hope after moving the least parts , it will reach a reasonable state. +3. Only one balance plan could be invoked at the same time. +4. Each balance plan has one id, and we could show the status by "balance id" command + and after FO, we could resume the balance plan by type "balance" again. +5. Each balance plan contains many balance tasks, the task represents the minimum movement unit. +6. We save the whole balancePlan state in kvstore to do failover. +7. Each balance task contains serval steps. And it should be executed step by step. +8. One task failed will result in the whole balance plan failed. +9. Currently, we hope tasks for the same part could be invoked serially + * */ +class Balancer { + FRIEND_TEST(BalanceTest, BalancePartsTest); + FRIEND_TEST(BalanceTest, NormalTest); + FRIEND_TEST(BalanceTest, RecoveryTest); + +public: + static Balancer* instance(kvstore::KVStore* kv) { + static std::unique_ptr client(new AdminClient()); + static std::unique_ptr balancer(new Balancer(kv, std::move(client))); + return balancer.get(); + } + + ~Balancer() = default; + + /* + * Return Error if reject the balance request, otherwise return balance id. + * */ + StatusOr balance(); + + /** + * TODO(heng): Rollback some specific balance id + */ + Status rollback(BalanceID id) { + return Status::Error("unplemented, %ld", id); + } + + /** + * TODO(heng): Only generate balance plan for our users. + * */ + const BalancePlan* preview() { + return plan_.get(); + } + + /** + * TODO(heng): Execute balance plan from outside. + * */ + Status execute(BalancePlan plan) { + UNUSED(plan); + return Status::Error("Unsupport it yet!"); + } + + /** + * TODO(heng): Execute specific balance plan by id. + * */ + Status execute(BalanceID id) { + UNUSED(id); + return Status::Error("Unsupport it yet!"); + } + +private: + Balancer(kvstore::KVStore* kv, std::unique_ptr client) + : kv_(kv) + , client_(std::move(client)) { + executor_.reset(new folly::CPUThreadPoolExecutor(1)); + } + /* + * When the balancer failover, we should recovery the status. + * */ + bool recovery(); + + /** + * Build balance plan and save it in kvstore. + * */ + Status buildBalancePlan(); + + std::vector genTasks(GraphSpaceID spaceId); + + void getHostParts(GraphSpaceID spaceId, + std::unordered_map>& hostParts, + int32_t& totalParts); + + void calDiff(const std::unordered_map>& hostParts, + const std::vector& activeHosts, + std::vector& newlyAdded, + std::vector& lost); + + StatusOr hostWithMinimalParts( + const std::unordered_map>& hostParts, + PartitionID partId); + + void balanceParts(BalanceID balanceId, + GraphSpaceID spaceId, + std::unordered_map>& newHostParts, + int32_t totalParts, + std::vector& tasks); + + + std::vector> + sortedHostsByParts(const std::unordered_map>& hostParts); + +private: + std::atomic_bool running_{false}; + kvstore::KVStore* kv_ = nullptr; + std::unique_ptr client_{nullptr}; + // Current running plan. + std::unique_ptr plan_{nullptr}; + std::unique_ptr executor_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_ADMIN_BALANCERR_H_ diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp new file mode 100644 index 00000000000..3906a7458e3 --- /dev/null +++ b/src/meta/test/BalancerTest.cpp @@ -0,0 +1,630 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "base/Base.h" +#include +#include +#include +#include "meta/processors/admin/Balancer.h" +#include "meta/test/TestUtils.h" +#include "fs/TempDir.h" +#include "meta/processors/partsMan/CreateSpaceProcessor.h" + +DECLARE_uint32(task_concurrency); + +namespace nebula { +namespace meta { + +class TestFaultInjector : public FaultInjector { +public: + explicit TestFaultInjector(std::vector sts) + : statusArray_(std::move(sts)) { + executor_.reset(new folly::CPUThreadPoolExecutor(1)); + } + + ~TestFaultInjector() { + } + + folly::Future response(int index) { + folly::Promise pro; + auto f = pro.getFuture(); + executor_->add([this, p = std::move(pro), index]() mutable { + p.setValue(this->statusArray_[index]); + }); + return f; + } + + folly::Future transLeader() override { + return response(0); + } + + folly::Future addPart() override { + return response(1); + } + + folly::Future addLearner() override { + return response(2); + } + + folly::Future waitingForCatchUpData() override { + return response(3); + } + + folly::Future memberChange() override { + return response(4); + } + + folly::Future updateMeta() override { + return response(5); + } + + folly::Future removePart() override { + return response(6); + } + + void reset(std::vector sts) { + statusArray_ = std::move(sts); + } + +private: + std::vector statusArray_; + std::unique_ptr executor_; +}; + +TEST(BalanceTaskTest, SimpleTest) { + { + std::vector sts(7, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + BalanceTask task(0, 0, 0, HostAddr(0, 0), HostAddr(1, 1), nullptr, nullptr); + folly::Baton b; + task.onFinished_ = [&]() { + LOG(INFO) << "Task finished!"; + EXPECT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_); + EXPECT_EQ(BalanceTask::Status::END, task.status_); + b.post(); + }; + task.onError_ = []() { + LOG(FATAL) << "We should not reach here!"; + }; + task.client_ = client.get(); + task.invoke(); + b.wait(); + } + { + std::vector sts{Status::Error("transLeader failed!"), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK()}; + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + BalanceTask task(0, 0, 0, HostAddr(0, 0), HostAddr(1, 1), nullptr, nullptr); + folly::Baton b; + task.onFinished_ = []() { + LOG(FATAL) << "We should not reach here!"; + }; + task.onError_ = [&]() { + LOG(INFO) << "Error happens!"; + EXPECT_EQ(BalanceTask::Result::FAILED, task.ret_); + EXPECT_EQ(BalanceTask::Status::CHANGE_LEADER, task.status_); + b.post(); + }; + task.client_ = client.get(); + task.invoke(); + b.wait(); + } + LOG(INFO) << "Test finished!"; +} + +TEST(BalanceTest, BalancePartsTest) { + auto* balancer = Balancer::instance(nullptr); + auto dump = [](const std::unordered_map>& hostParts, + const std::vector& tasks) { + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + std::stringstream ss; + ss << it->first << ":"; + for (auto partId : it->second) { + ss << partId << ","; + } + VLOG(1) << ss.str(); + } + for (auto& task : tasks) { + VLOG(1) << task.taskIdStr(); + } + }; + { + std::unordered_map> hostParts; + hostParts.emplace(HostAddr(0, 0), std::vector{1, 2, 3, 4}); + hostParts.emplace(HostAddr(1, 0), std::vector{1, 2, 3, 4}); + hostParts.emplace(HostAddr(2, 0), std::vector{1, 2, 3, 4}); + hostParts.emplace(HostAddr(3, 0), std::vector{}); + int32_t totalParts = 12; + std::vector tasks; + VLOG(1) << "=== original map ===="; + dump(hostParts, tasks); + balancer->balanceParts(0, 0, hostParts, totalParts, tasks); + VLOG(1) << "=== new map ===="; + dump(hostParts, tasks); + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + EXPECT_EQ(3, it->second.size()); + } + EXPECT_EQ(3, tasks.size()); + } + { + std::unordered_map> hostParts; + hostParts.emplace(HostAddr(0, 0), std::vector{1, 2, 3, 4, 5}); + hostParts.emplace(HostAddr(1, 0), std::vector{1, 2, 4, 5}); + hostParts.emplace(HostAddr(2, 0), std::vector{2, 3, 4, 5}); + hostParts.emplace(HostAddr(3, 0), std::vector{1, 3}); + int32_t totalParts = 15; + std::vector tasks; + VLOG(1) << "=== original map ===="; + dump(hostParts, tasks); + balancer->balanceParts(0, 0, hostParts, totalParts, tasks); + VLOG(1) << "=== new map ===="; + dump(hostParts, tasks); + EXPECT_EQ(4, hostParts[HostAddr(0, 0)].size()); + EXPECT_EQ(4, hostParts[HostAddr(1, 0)].size()); + EXPECT_EQ(4, hostParts[HostAddr(2, 0)].size()); + EXPECT_EQ(3, hostParts[HostAddr(3, 0)].size()); + EXPECT_EQ(1, tasks.size()); + } + { + std::unordered_map> hostParts; + hostParts.emplace(HostAddr(0, 0), std::vector{1, 2, 3, 4}); + hostParts.emplace(HostAddr(1, 0), std::vector{1, 2, 4, 5}); + hostParts.emplace(HostAddr(2, 0), std::vector{2, 3, 4, 5}); + hostParts.emplace(HostAddr(3, 0), std::vector{1, 3, 5}); + int32_t totalParts = 15; + std::vector tasks; + VLOG(1) << "=== original map ===="; + dump(hostParts, tasks); + balancer->balanceParts(0, 0, hostParts, totalParts, tasks); + VLOG(1) << "=== new map ===="; + dump(hostParts, tasks); + EXPECT_EQ(4, hostParts[HostAddr(0, 0)].size()); + EXPECT_EQ(4, hostParts[HostAddr(1, 0)].size()); + EXPECT_EQ(4, hostParts[HostAddr(2, 0)].size()); + EXPECT_EQ(3, hostParts[HostAddr(3, 0)].size()); + EXPECT_EQ(0, tasks.size()); + } + { + std::unordered_map> hostParts; + hostParts.emplace(HostAddr(0, 0), std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9}); + hostParts.emplace(HostAddr(1, 0), std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9}); + hostParts.emplace(HostAddr(2, 0), std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9}); + hostParts.emplace(HostAddr(3, 0), std::vector{}); + hostParts.emplace(HostAddr(4, 0), std::vector{}); + hostParts.emplace(HostAddr(5, 0), std::vector{}); + hostParts.emplace(HostAddr(6, 0), std::vector{}); + hostParts.emplace(HostAddr(7, 0), std::vector{}); + hostParts.emplace(HostAddr(8, 0), std::vector{}); + int32_t totalParts = 27; + std::vector tasks; + VLOG(1) << "=== original map ===="; + dump(hostParts, tasks); + balancer->balanceParts(0, 0, hostParts, totalParts, tasks); + VLOG(1) << "=== new map ===="; + dump(hostParts, tasks); + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + EXPECT_EQ(3, it->second.size()); + } + EXPECT_EQ(18, tasks.size()); + } + { + std::unordered_map> hostParts; + hostParts.emplace(HostAddr(0, 0), std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9}); + hostParts.emplace(HostAddr(1, 0), std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9}); + hostParts.emplace(HostAddr(2, 0), std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9}); + hostParts.emplace(HostAddr(3, 0), std::vector{}); + hostParts.emplace(HostAddr(4, 0), std::vector{}); + hostParts.emplace(HostAddr(5, 0), std::vector{}); + hostParts.emplace(HostAddr(6, 0), std::vector{}); + hostParts.emplace(HostAddr(7, 0), std::vector{}); + int32_t totalParts = 27; + std::vector tasks; + VLOG(1) << "=== original map ===="; + dump(hostParts, tasks); + balancer->balanceParts(0, 0, hostParts, totalParts, tasks); + VLOG(1) << "=== new map ===="; + dump(hostParts, tasks); + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + EXPECT_GE(4, it->second.size()); + EXPECT_LE(3, it->second.size()); + } + } +} + +TEST(BalanceTest, DispatchTasksTest) { + { + FLAGS_task_concurrency = 10; + BalancePlan plan(0L, nullptr, nullptr); + for (int i = 0; i < 20; i++) { + BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + plan.addTask(std::move(task)); + } + plan.dispatchTasks(); + // All tasks is about space 0, part 0. + // So they will be dispatched into the same bucket. + ASSERT_EQ(1, plan.buckets_.size()); + ASSERT_EQ(20, plan.buckets_[0].size()); + } + { + FLAGS_task_concurrency = 10; + BalancePlan plan(0L, nullptr, nullptr); + for (int i = 0; i < 5; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + plan.addTask(std::move(task)); + } + plan.dispatchTasks(); + ASSERT_EQ(5, plan.buckets_.size()); + for (auto& bucket : plan.buckets_) { + ASSERT_EQ(1, bucket.size()); + } + } + { + FLAGS_task_concurrency = 20; + BalancePlan plan(0L, nullptr, nullptr); + for (int i = 0; i < 5; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + plan.addTask(std::move(task)); + } + for (int i = 0; i < 10; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 2), HostAddr(i, 3), nullptr, nullptr); + plan.addTask(std::move(task)); + } + plan.dispatchTasks(); + ASSERT_EQ(10, plan.buckets_.size()); + int32_t total = 0; + for (auto i = 0; i < 10; i++) { + ASSERT_LE(1, plan.buckets_[i].size()); + ASSERT_GE(2, plan.buckets_[i].size()); + total += plan.buckets_[i].size(); + } + ASSERT_EQ(15, total); + } +} + +TEST(BalanceTest, BalancePlanTest) { + { + LOG(INFO) << "Test with all tasks succeeded, only one bucket!"; + BalancePlan plan(0L, nullptr, nullptr); + std::vector sts(7, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + + for (int i = 0; i < 10; i++) { + BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + task.client_ = client.get(); + plan.addTask(std::move(task)); + } + folly::Baton b; + plan.onFinished_ = [&plan, &b] () { + ASSERT_EQ(BalancePlan::Status::SUCCEEDED, plan.status_); + ASSERT_EQ(10, plan.finishedTaskNum_); + b.post(); + }; + plan.invoke(); + b.wait(); + // All tasks is about space 0, part 0. + // So they will be dispatched into the same bucket. + ASSERT_EQ(1, plan.buckets_.size()); + ASSERT_EQ(10, plan.buckets_[0].size()); + } + { + LOG(INFO) << "Test with all tasks succeeded, 10 buckets!"; + BalancePlan plan(0L, nullptr, nullptr); + std::vector sts(7, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + + for (int i = 0; i < 10; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + task.client_ = client.get(); + plan.addTask(std::move(task)); + } + folly::Baton b; + plan.onFinished_ = [&plan, &b] () { + ASSERT_EQ(BalancePlan::Status::SUCCEEDED, plan.status_); + ASSERT_EQ(10, plan.finishedTaskNum_); + b.post(); + }; + plan.invoke(); + b.wait(); + // All tasks is about different parts. + // So they will be dispatched into different buckets. + ASSERT_EQ(10, plan.buckets_.size()); + for (auto i = 0; i < 10; i++) { + ASSERT_EQ(1, plan.buckets_[1].size()); + } + } + { + LOG(INFO) << "Test with one task failed, 10 buckets"; + BalancePlan plan(0L, nullptr, nullptr); + std::unique_ptr client1, client2; + { + std::vector sts(7, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + client1 = std::make_unique(std::move(injector)); + for (int i = 0; i < 9; i++) { + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + task.client_ = client1.get(); + plan.addTask(std::move(task)); + } + } + { + std::vector sts { + Status::Error("transLeader failed!"), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK(), + Status::OK()}; + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + client2 = std::make_unique(std::move(injector)); + BalanceTask task(0, 0, 0, HostAddr(10, 0), HostAddr(10, 1), nullptr, nullptr); + task.client_ = client2.get(); + plan.addTask(std::move(task)); + } + folly::Baton b; + plan.onFinished_ = [&plan, &b] () { + ASSERT_EQ(BalancePlan::Status::FAILED, plan.status_); + ASSERT_EQ(10, plan.finishedTaskNum_); + b.post(); + }; + plan.invoke(); + b.wait(); + } +} + +TEST(BalanceTest, NormalTest) { + fs::TempDir rootPath("/tmp/BalanceTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + FLAGS_expired_hosts_check_interval_sec = 1; + FLAGS_expired_threshold_sec = 1; + TestUtils::createSomeHosts(kv.get()); + { + cpp2::SpaceProperties properties; + properties.set_space_name("default_space"); + properties.set_partition_num(8); + properties.set_replica_factor(3); + cpp2::CreateSpaceReq req; + req.set_properties(std::move(properties)); + auto* processor = CreateSpaceProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + ASSERT_EQ(1, resp.get_id().get_space_id()); + } + std::vector sts(7, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + Balancer balancer(kv.get(), std::move(client)); + auto ret = balancer.balance(); + CHECK_EQ(Status::Error("No tasks"), ret.status()); + + sleep(1); + LOG(INFO) << "Now, we lost host " << HostAddr(3, 3); + TestUtils::registerHB({{0, 0}, {1, 1}, {2, 2}}); + ret = balancer.balance(); + CHECK(ret.ok()); + auto balanceId = ret.value(); + sleep(1); + LOG(INFO) << "Rebalance finished!"; + { + const auto& prefix = BalancePlan::prefix(); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int num = 0; + while (iter->valid()) { + auto id = BalancePlan::id(iter->key()); + auto status = BalancePlan::status(iter->val()); + ASSERT_EQ(balanceId, id); + ASSERT_EQ(BalancePlan::Status::SUCCEEDED, status); + num++; + iter->next(); + } + ASSERT_EQ(1, num); + } + { + const auto& prefix = BalanceTask::prefix(balanceId); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int32_t num = 0; + while (iter->valid()) { + BalanceTask task; + { + auto tup = BalanceTask::parseKey(iter->key()); + task.balanceId_ = std::get<0>(tup); + ASSERT_EQ(balanceId, task.balanceId_); + task.spaceId_ = std::get<1>(tup); + ASSERT_EQ(1, task.spaceId_); + task.src_ = std::get<3>(tup); + ASSERT_EQ(HostAddr(3, 3), task.src_); + } + { + auto tup = BalanceTask::parseVal(iter->val()); + task.status_ = std::get<0>(tup); + ASSERT_EQ(BalanceTask::Status::END, task.status_); + task.ret_ = std::get<1>(tup); + ASSERT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_); + task.startTimeMs_ = std::get<2>(tup); + ASSERT_GT(task.startTimeMs_, 0); + task.endTimeMs_ = std::get<3>(tup); + ASSERT_GT(task.endTimeMs_, 0); + } + num++; + iter->next(); + } + ASSERT_EQ(6, num); + } +} + +TEST(BalanceTest, RecoveryTest) { + fs::TempDir rootPath("/tmp/BalanceTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + FLAGS_expired_hosts_check_interval_sec = 1; + FLAGS_expired_threshold_sec = 1; + TestUtils::createSomeHosts(kv.get()); + { + cpp2::SpaceProperties properties; + properties.set_space_name("default_space"); + properties.set_partition_num(8); + properties.set_replica_factor(3); + cpp2::CreateSpaceReq req; + req.set_properties(std::move(properties)); + auto* processor = CreateSpaceProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + ASSERT_EQ(1, resp.get_id().get_space_id()); + } + + sleep(1); + LOG(INFO) << "Now, we lost host " << HostAddr(3, 3); + TestUtils::registerHB({{0, 0}, {1, 1}, {2, 2}}); + std::vector sts { + Status::OK(), + Status::OK(), + Status::OK(), + Status::Error("catch up data failed!"), + Status::OK(), + Status::OK(), + Status::OK()}; + + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + Balancer balancer(kv.get(), std::move(client)); + auto ret = balancer.balance(); + CHECK(ret.ok()); + auto balanceId = ret.value(); + sleep(1); + { + const auto& prefix = BalancePlan::prefix(); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int num = 0; + while (iter->valid()) { + auto id = BalancePlan::id(iter->key()); + auto status = BalancePlan::status(iter->val()); + ASSERT_EQ(balanceId, id); + ASSERT_EQ(BalancePlan::Status::FAILED, status); + num++; + iter->next(); + } + ASSERT_EQ(1, num); + } + { + const auto& prefix = BalanceTask::prefix(balanceId); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int32_t num = 0; + while (iter->valid()) { + BalanceTask task; + { + auto tup = BalanceTask::parseKey(iter->key()); + task.balanceId_ = std::get<0>(tup); + ASSERT_EQ(balanceId, task.balanceId_); + task.spaceId_ = std::get<1>(tup); + ASSERT_EQ(1, task.spaceId_); + task.src_ = std::get<3>(tup); + ASSERT_EQ(HostAddr(3, 3), task.src_); + } + { + auto tup = BalanceTask::parseVal(iter->val()); + task.status_ = std::get<0>(tup); + ASSERT_EQ(BalanceTask::Status::CATCH_UP_DATA, task.status_); + task.ret_ = std::get<1>(tup); + ASSERT_EQ(BalanceTask::Result::FAILED, task.ret_); + task.startTimeMs_ = std::get<2>(tup); + ASSERT_GT(task.startTimeMs_, 0); + task.endTimeMs_ = std::get<3>(tup); + ASSERT_GT(task.endTimeMs_, 0); + } + num++; + iter->next(); + } + ASSERT_EQ(6, num); + } + LOG(INFO) << "Now let's recovery it."; + std::vector normalSts(7, Status::OK()); + static_cast(balancer.client_->faultInjector())->reset(std::move(normalSts)); + ret = balancer.balance(); + CHECK(ret.ok()); + balanceId = ret.value(); + sleep(1); + { + const auto& prefix = BalancePlan::prefix(); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int num = 0; + while (iter->valid()) { + auto id = BalancePlan::id(iter->key()); + auto status = BalancePlan::status(iter->val()); + ASSERT_EQ(balanceId, id); + ASSERT_EQ(BalancePlan::Status::SUCCEEDED, status); + num++; + iter->next(); + } + ASSERT_EQ(1, num); + } + { + const auto& prefix = BalanceTask::prefix(balanceId); + std::unique_ptr iter; + auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED); + int32_t num = 0; + while (iter->valid()) { + BalanceTask task; + { + auto tup = BalanceTask::parseKey(iter->key()); + task.balanceId_ = std::get<0>(tup); + ASSERT_EQ(balanceId, task.balanceId_); + task.spaceId_ = std::get<1>(tup); + ASSERT_EQ(1, task.spaceId_); + task.src_ = std::get<3>(tup); + ASSERT_EQ(HostAddr(3, 3), task.src_); + } + { + auto tup = BalanceTask::parseVal(iter->val()); + task.status_ = std::get<0>(tup); + ASSERT_EQ(BalanceTask::Status::END, task.status_); + task.ret_ = std::get<1>(tup); + ASSERT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_); + task.startTimeMs_ = std::get<2>(tup); + ASSERT_GT(task.startTimeMs_, 0); + task.endTimeMs_ = std::get<3>(tup); + ASSERT_GT(task.endTimeMs_, 0); + } + num++; + iter->next(); + } + ASSERT_EQ(6, num); + } +} + +} // namespace meta +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 77061e4f336..2d040d1cc80 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -137,7 +137,6 @@ nebula_link_libraries( ) nebula_add_test(active_hosts_man_test) - add_executable( meta_http_test MetaHttpHandlerTest.cpp @@ -171,6 +170,35 @@ nebula_link_libraries( gtest ) nebula_add_test(meta_http_test) + +add_executable( + balancer_test + BalancerTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + balancer_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + gtest +) +nebula_add_test(balancer_test) + add_executable( authentication_test AuthProcessorTest.cpp @@ -192,12 +220,11 @@ add_executable( ) nebula_link_libraries( authentication_test - proxygenhttpserver - proxygenlib ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} wangle gtest ) + nebula_add_test(authentication_test)