From c7c1818035fa88f0d552d400042f173fae29bf14 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Wed, 28 Aug 2019 11:03:56 +0800 Subject: [PATCH] Leader balance (#731) Basic leader balance for storage, keep trying to trans leader from host with most leader to host with least leader, until all balanced. Modify/Add two command in console: 1. "show host" will also display total leader count and leader count in each space 2. add command "balance leader", which will try to balance leader between different hosts --- src/graph/BalanceExecutor.cpp | 64 ++++ src/graph/BalanceExecutor.h | 38 +++ src/graph/CMakeLists.txt | 1 + src/graph/Executor.cpp | 4 + src/graph/ShowExecutor.cpp | 49 ++- src/graph/test/SchemaTest.cpp | 13 +- src/interface/meta.thrift | 6 + src/interface/storage.thrift | 11 +- src/kvstore/Common.h | 1 + src/kvstore/KVEngine.h | 1 + src/kvstore/KVStore.h | 3 + src/kvstore/LogEncoder.cpp | 30 ++ src/kvstore/LogEncoder.h | 3 + src/kvstore/NebulaStore.cpp | 18 +- src/kvstore/NebulaStore.h | 5 +- src/kvstore/Part.cpp | 29 +- src/kvstore/Part.h | 16 + src/kvstore/PartManager.h | 1 + src/kvstore/plugins/hbase/HBaseStore.cpp | 6 + src/kvstore/plugins/hbase/HBaseStore.h | 3 + src/kvstore/raftex/RaftPart.cpp | 81 ++++- src/kvstore/raftex/RaftPart.h | 6 + src/kvstore/raftex/test/CMakeLists.txt | 9 +- .../raftex/test/LeaderTransferTest.cpp | 105 +++++++ src/kvstore/raftex/test/RaftexTestBase.cpp | 18 +- src/kvstore/raftex/test/RaftexTestBase.h | 4 +- src/kvstore/raftex/test/TestShard.cpp | 20 ++ src/kvstore/raftex/test/TestShard.h | 12 + src/kvstore/test/NebulaStoreTest.cpp | 124 ++++++++ src/meta/CMakeLists.txt | 1 + src/meta/MetaServiceHandler.cpp | 9 +- src/meta/MetaServiceHandler.h | 9 +- src/meta/client/MetaClient.cpp | 34 +-- src/meta/client/MetaClient.h | 6 +- src/meta/processors/admin/AdminClient.cpp | 208 ++++++++----- src/meta/processors/admin/AdminClient.h | 7 + src/meta/processors/admin/Balancer.cpp | 284 +++++++++++++++++- src/meta/processors/admin/Balancer.h | 40 ++- .../admin/LeaderBalanceProcessor.cpp | 24 ++ .../processors/admin/LeaderBalanceProcessor.h | 32 ++ .../partsMan/ListHostsProcessor.cpp | 85 +++++- .../processors/partsMan/ListHostsProcessor.h | 13 +- src/meta/test/BalanceIntegrationTest.cpp | 86 ++++++ src/meta/test/BalancerTest.cpp | 229 +++++++++++++- src/meta/test/CMakeLists.txt | 1 + src/meta/test/MetaClientTest.cpp | 12 +- src/meta/test/TestUtils.h | 22 +- src/parser/AdminSentences.cpp | 10 + src/parser/AdminSentences.h | 23 ++ src/parser/Sentence.h | 1 + src/parser/parser.yy | 10 +- src/parser/scanner.lex | 4 + src/parser/test/ParserTest.cpp | 9 + src/parser/test/ScannerTest.cpp | 6 + src/storage/AdminProcessor.h | 59 +++- src/storage/StorageServiceHandler.cpp | 6 + src/storage/StorageServiceHandler.h | 3 + 57 files changed, 1739 insertions(+), 175 deletions(-) create mode 100644 src/graph/BalanceExecutor.cpp create mode 100644 src/graph/BalanceExecutor.h create mode 100644 src/kvstore/raftex/test/LeaderTransferTest.cpp create mode 100644 src/meta/processors/admin/LeaderBalanceProcessor.cpp create mode 100644 src/meta/processors/admin/LeaderBalanceProcessor.h diff --git a/src/graph/BalanceExecutor.cpp b/src/graph/BalanceExecutor.cpp new file mode 100644 index 00000000000..4602bbb2a89 --- /dev/null +++ b/src/graph/BalanceExecutor.cpp @@ -0,0 +1,64 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "graph/BalanceExecutor.h" + +namespace nebula { +namespace graph { + +BalanceExecutor::BalanceExecutor(Sentence *sentence, + ExecutionContext *ectx) : Executor(ectx) { + sentence_ = static_cast(sentence); +} + +Status BalanceExecutor::prepare() { + return Status::OK(); +} + +void BalanceExecutor::execute() { + auto showType = sentence_->subType(); + switch (showType) { + case BalanceSentence::SubType::kLeader: + balanceLeader(); + break; + case BalanceSentence::SubType::kUnknown: + onError_(Status::Error("Type unknown")); + break; + } +} + +void BalanceExecutor::balanceLeader() { + auto future = ectx()->getMetaClient()->balanceLeader(); + auto *runner = ectx()->rctx()->runner(); + + auto cb = [this] (auto &&resp) { + if (!resp.ok()) { + DCHECK(onError_); + onError_(std::move(resp).status()); + return; + } + auto ret = std::move(resp).value(); + if (!ret) { + DCHECK(onError_); + onError_(Status::Error("Balance leader failed")); + return; + } + DCHECK(onFinish_); + onFinish_(); + }; + + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + DCHECK(onError_); + onError_(Status::Error("Internal error")); + return; + }; + + std::move(future).via(runner).thenValue(cb).thenError(error); +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/BalanceExecutor.h b/src/graph/BalanceExecutor.h new file mode 100644 index 00000000000..4b10bc6d9dd --- /dev/null +++ b/src/graph/BalanceExecutor.h @@ -0,0 +1,38 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef GRAPH_BALANCEEXECUTOR_H_ +#define GRAPH_BALANCEEXECUTOR_H_ + +#include "base/Base.h" +#include "graph/Executor.h" + +namespace nebula { +namespace graph { + +class BalanceExecutor final : public Executor { +public: + BalanceExecutor(Sentence *sentence, ExecutionContext *ectx); + + const char* name() const override { + return "BalanceExecutor"; + } + + Status MUST_USE_RESULT prepare() override; + + void execute() override; + + void balanceLeader(); + +private: + BalanceSentence *sentence_{nullptr}; + std::unique_ptr resp_; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_BALANCEEXECUTOR_H_ diff --git a/src/graph/CMakeLists.txt b/src/graph/CMakeLists.txt index 7d20bb11161..9aa9e3ebfc6 100644 --- a/src/graph/CMakeLists.txt +++ b/src/graph/CMakeLists.txt @@ -37,6 +37,7 @@ add_library( OrderByExecutor.cpp IngestExecutor.cpp ConfigExecutor.cpp + BalanceExecutor.cpp SchemaHelper.cpp FetchVerticesExecutor.cpp FetchEdgesExecutor.cpp diff --git a/src/graph/Executor.cpp b/src/graph/Executor.cpp index 683d0633c4d..65b0c26ce05 100644 --- a/src/graph/Executor.cpp +++ b/src/graph/Executor.cpp @@ -41,6 +41,7 @@ #include "graph/SetExecutor.h" #include "graph/FindExecutor.h" #include "graph/MatchExecutor.h" +#include "graph/BalanceExecutor.h" namespace nebula { namespace graph { @@ -139,6 +140,9 @@ std::unique_ptr Executor::makeExecutor(Sentence *sentence) { case Sentence::Kind::kFind: executor = std::make_unique(sentence, ectx()); break; + case Sentence::Kind::kBalance: + executor = std::make_unique(sentence, ectx()); + break; case Sentence::Kind::kUnknown: LOG(FATAL) << "Sentence kind unknown"; break; diff --git a/src/graph/ShowExecutor.cpp b/src/graph/ShowExecutor.cpp index 35045295679..a3fdf0465ff 100644 --- a/src/graph/ShowExecutor.cpp +++ b/src/graph/ShowExecutor.cpp @@ -82,18 +82,53 @@ void ShowExecutor::showHosts() { return; } - auto retShowHosts = std::move(resp).value(); + auto hostItems = std::move(resp).value(); std::vector rows; - std::vector header{"Ip", "Port", "Status"}; + std::vector header{"Ip", "Port", "Status", "Leader count", + "Leader distribution", "Partition distribution"}; resp_ = std::make_unique(); resp_->set_column_names(std::move(header)); - for (auto &status : retShowHosts) { + for (auto& item : hostItems) { std::vector row; - row.resize(3); - row[0].set_str(NetworkUtils::ipFromHostAddr(status.first)); - row[1].set_str(folly::to(NetworkUtils::portFromHostAddr(status.first))); - row[2].set_str(status.second); + row.resize(6); + auto hostAddr = HostAddr(item.hostAddr.ip, item.hostAddr.port); + row[0].set_str(NetworkUtils::ipFromHostAddr(hostAddr)); + row[1].set_str(folly::to(NetworkUtils::portFromHostAddr(hostAddr))); + switch (item.get_status()) { + case meta::cpp2::HostStatus::ONLINE: + row[2].set_str("online"); + break; + case meta::cpp2::HostStatus::OFFLINE: + case meta::cpp2::HostStatus::UNKNOWN: + row[2].set_str("offline"); + break; + } + + int32_t leaderCount = 0; + std::string leaders; + for (auto& spaceEntry : item.get_leader_parts()) { + leaderCount += spaceEntry.second.size(); + leaders += "space " + folly::to(spaceEntry.first) + ": " + + folly::to(spaceEntry.second.size()) + ", "; + } + if (!leaders.empty()) { + leaders.resize(leaders.size() - 2); + } + + row[3].set_integer(leaderCount); + row[4].set_str(leaders); + + std::string parts; + for (auto& spaceEntry : item.get_all_parts()) { + parts += "space " + folly::to(spaceEntry.first) + ": " + + folly::to(spaceEntry.second.size()) + ", "; + } + if (!parts.empty()) { + parts.resize(parts.size() - 2); + } + row[5].set_str(parts); + rows.emplace_back(); rows.back().set_columns(std::move(row)); } diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index 5af4c465a8e..339bc7f42b0 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -52,8 +52,9 @@ TEST_F(SchemaTest, metaCommunication) { cpp2::ExecutionResponse resp; std::string query = "SHOW HOSTS"; client->execute(query, resp); - std::vector> expected{ - {"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online"}, + std::vector> expected { + {"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online", 0, "", ""}, }; ASSERT_TRUE(verifyResult(resp, expected)); } @@ -254,7 +255,7 @@ TEST_F(SchemaTest, metaCommunication) { // Test existent tag { cpp2::ExecutionResponse resp; - std::string query = "CREATE TAG person(id int, balance double)"; + std::string query = "CREATE TAG person(id int)"; auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); } @@ -640,6 +641,7 @@ TEST_F(SchemaTest, metaCommunication) { client->execute(query, resp); ASSERT_EQ(1, (*(resp.get_rows())).size()); } + sleep(FLAGS_load_data_interval_secs + 1); } @@ -650,8 +652,9 @@ TEST_F(SchemaTest, TTLtest) { cpp2::ExecutionResponse resp; std::string query = "SHOW HOSTS"; client->execute(query, resp); - std::vector> expected{ - {"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online"}, + std::vector> expected { + {"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online", 0, "", ""}, }; ASSERT_TRUE(verifyResult(resp, expected)); } diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index e4de7a3372d..484d429b0dd 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -121,6 +121,8 @@ enum HostStatus { struct HostItem { 1: common.HostAddr hostAddr, 2: HostStatus status, + 3: map> (cpp.template = "std::unordered_map") leader_parts, + 4: map> (cpp.template = "std::unordered_map") all_parts, } struct UserItem { @@ -437,6 +439,9 @@ struct BalanceResp { 3: common.HostAddr leader, } +struct LeaderBalanceReq { +} + enum ConfigModule { UNKNOWN = 0x00, ALL = 0x01, @@ -540,6 +545,7 @@ service MetaService { HBResp heartBeat(1: HBReq req); BalanceResp balance(1: BalanceReq req); + ExecResp leaderBalance(1: LeaderBalanceReq req); ExecResp regConfig(1: RegConfigReq req); GetConfigResp getConfig(1: GetConfigReq req); diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 170ce35b770..28d8916b286 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -199,10 +199,18 @@ struct AddLearnerReq { struct CatchUpDataReq { 1: common.GraphSpaceID space_id, - 2: common.GraphSpaceID part_id, + 2: common.PartitionID part_id, 3: common.HostAddr target, } +struct GetLeaderReq { +} + +struct GetLeaderResp { + 1: ErrorCode code, + 2: map> (cpp.template = "std::unordered_map") leader_parts; +} + service StorageService { QueryResponse getOutBound(1: GetNeighborsRequest req) @@ -225,5 +233,6 @@ service StorageService { AdminExecResp waitingForCatchUpData(1: CatchUpDataReq req); AdminExecResp removePart(1: RemovePartReq req); AdminExecResp memberChange(1: MemberChangeReq req); + GetLeaderResp getLeaderPart(1: GetLeaderReq req); } diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index b1e6df44334..7df1135f8df 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -42,6 +42,7 @@ class KVFilter { using KV = std::pair; using KVCallback = folly::Function; +using NewLeaderCallback = folly::Function; inline rocksdb::Slice toSlice(const folly::StringPiece& str) { return rocksdb::Slice(str.begin(), str.size()); diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 0eeae367a96..cc784e93c37 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -86,6 +86,7 @@ class KVEngine { // Remove partId from current storage engine. virtual void removePart(PartitionID partId) = 0; + // Return all partIds current storage engine holds. virtual std::vector allParts() = 0; diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index 308f8626cb0..6b901af7a92 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -144,6 +144,9 @@ class KVStore { virtual ResultCode ingest(GraphSpaceID spaceId) = 0; + virtual int32_t allLeader(std::unordered_map>& leaderIds) = 0; + virtual ErrorOr> part(GraphSpaceID spaceId, PartitionID partId) = 0; diff --git a/src/kvstore/LogEncoder.cpp b/src/kvstore/LogEncoder.cpp index 450f5f55257..c4324feaec1 100644 --- a/src/kvstore/LogEncoder.cpp +++ b/src/kvstore/LogEncoder.cpp @@ -153,6 +153,10 @@ std::string encodeLearner(const HostAddr& learner) { // Log type auto type = LogType::OP_ADD_LEARNER; encoded.append(reinterpret_cast(&type), 1); + // Value length + uint32_t len = static_cast(sizeof(HostAddr)); + encoded.append(reinterpret_cast(&len), sizeof(len)); + // Learner addr encoded.append(reinterpret_cast(&learner), sizeof(HostAddr)); return encoded; } @@ -167,6 +171,32 @@ HostAddr decodeLearner(const std::string& encoded) { return addr; } +std::string encodeTransLeader(const HostAddr& targetAddr) { + std::string encoded; + encoded.reserve(kHeadLen + sizeof(HostAddr)); + // Timestamp (8 bytes) + int64_t ts = time::WallClock::fastNowInMilliSec(); + encoded.append(reinterpret_cast(&ts), sizeof(int64_t)); + // Log type + auto type = LogType::OP_TRANS_LEADER; + encoded.append(reinterpret_cast(&type), 1); + // Value length + uint32_t len = static_cast(sizeof(HostAddr)); + encoded.append(reinterpret_cast(&len), sizeof(len)); + // Target addr + encoded.append(reinterpret_cast(&targetAddr), sizeof(HostAddr)); + return encoded; +} + +HostAddr decodeTransLeader(folly::StringPiece encoded) { + HostAddr addr; + CHECK_EQ(kHeadLen + sizeof(HostAddr), encoded.size()); + memcpy(&addr.first, encoded.begin() + kHeadLen, sizeof(addr.first)); + memcpy(&addr.second, + encoded.begin() + kHeadLen + sizeof(addr.first), + sizeof(addr.second)); + return addr; +} } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index 4d656770b2c..e3232761f45 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -20,6 +20,7 @@ enum LogType : char { OP_REMOVE_PREFIX = 0x5, OP_REMOVE_RANGE = 0x6, OP_ADD_LEARNER = 0x07, + OP_TRANS_LEADER = 0x08, }; @@ -36,6 +37,8 @@ std::vector decodeMultiValues(folly::StringPiece encoded); std::string encodeLearner(const HostAddr& learner); HostAddr decodeLearner(const std::string& encoded); +std::string encodeTransLeader(const HostAddr& targetAddr); +HostAddr decodeTransLeader(folly::StringPiece encoded); } // namespace kvstore } // namespace nebula #endif // KVSTORE_LOGENCODER_H_ diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 2dff1c46716..d3fcd58235e 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -404,7 +404,6 @@ ErrorOr> NebulaStore::part(GraphSpaceID spaceI return partIt->second; } - ResultCode NebulaStore::ingest(GraphSpaceID spaceId) { auto spaceRet = space(spaceId); if (!ok(spaceRet)) { @@ -547,6 +546,23 @@ ErrorOr> NebulaStore::space(GraphSpac return it->second; } +int32_t NebulaStore::allLeader(std::unordered_map>& leaderIds) { + folly::RWSpinLock::ReadHolder rh(&lock_); + int32_t count = 0; + for (const auto& spaceIt : spaces_) { + auto spaceId = spaceIt.first; + for (const auto& partIt : spaceIt.second->parts_) { + auto partId = partIt.first; + if (partIt.second->isLeader()) { + leaderIds[spaceId].emplace_back(partId); + ++count; + } + } + } + return count; +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index c1187cab7cb..346bacb350d 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -30,11 +30,11 @@ struct SpacePartInfo { std::vector> engines_; }; - class NebulaStore : public KVStore, public Handler { FRIEND_TEST(NebulaStoreTest, SimpleTest); FRIEND_TEST(NebulaStoreTest, PartsTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest); + FRIEND_TEST(NebulaStoreTest, TransLeaderTest); public: NebulaStore(KVOptions options, @@ -160,6 +160,9 @@ class NebulaStore : public KVStore, public Handler { ResultCode flush(GraphSpaceID spaceId) override; + int32_t allLeader(std::unordered_map>& leaderIds) override; + bool isLeader(GraphSpaceID spaceId, PartitionID partId); private: diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index e3e59a9bc76..cd77387d596 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -148,6 +148,14 @@ void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) { }); } +void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) { + std::string log = encodeTransLeader(target); + sendCommandAsync(std::move(log)) + .then([callback = std::move(cb)] (AppendLogResult res) mutable { + callback(toResultCode(res)); + }); +} + void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for the term " << term; } @@ -157,6 +165,13 @@ void Part::onElected(TermID term) { VLOG(1) << "Being elected as the leader for the term " << term; } +void Part::onDiscoverNewLeader(HostAddr nLeader) { + LOG(INFO) << idStr_ << "Find the new leader " << nLeader; + if (newLeaderCb_) { + newLeaderCb_(nLeader); + } +} + bool Part::commitLogs(std::unique_ptr iter) { auto batch = engine_->startBatchWrite(); LogID lastId = -1; @@ -232,6 +247,12 @@ bool Part::commitLogs(std::unique_ptr iter) { case OP_ADD_LEARNER: { break; } + case OP_TRANS_LEADER: { + auto newLeader = decodeTransLeader(log); + commitTransLeader(newLeader); + LOG(INFO) << idStr_ << "Transfer leader to " << newLeader; + break; + } default: { LOG(FATAL) << "Unknown operation: " << static_cast(log[0]); } @@ -263,7 +284,13 @@ bool Part::preProcessLog(LogID logId, case OP_ADD_LEARNER: { auto learner = decodeLearner(log); addLearner(learner); - LOG(INFO) << idStr_ << "Add learner " << learner; + LOG(INFO) << idStr_ << "Preprocess add learner " << learner; + break; + } + case OP_TRANS_LEADER: { + auto newLeader = decodeTransLeader(log); + preProcessTransLeader(newLeader); + LOG(INFO) << idStr_ << "Preprocess transfer leader to " << newLeader; break; } default: { diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index d81985ff2e1..f9dd5526734 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -15,6 +15,7 @@ namespace nebula { namespace kvstore { + class Part : public raftex::RaftPart { public: Part(GraphSpaceID spaceId, @@ -47,6 +48,18 @@ class Part : public raftex::RaftPart { void asyncAtomicOp(raftex::AtomicOp op, KVCallback cb); void asyncAddLearner(const HostAddr& learner, KVCallback cb); + + void asyncTransferLeader(const HostAddr& target, KVCallback cb); + + void registerNewLeaderCb(NewLeaderCallback cb) { + newLeaderCb_ = std::move(cb); + } + + void unRegisterNewLeaderCb() { + newLeaderCb_ = nullptr; + } + +private: /** * Methods inherited from RaftPart */ @@ -56,6 +69,8 @@ class Part : public raftex::RaftPart { void onElected(TermID term) override; + void onDiscoverNewLeader(HostAddr nLeader) override; + bool commitLogs(std::unique_ptr iter) override; bool preProcessLog(LogID logId, @@ -68,6 +83,7 @@ class Part : public raftex::RaftPart { PartitionID partId_; std::string walPath_; KVEngine* engine_ = nullptr; + NewLeaderCallback newLeaderCb_ = nullptr; }; } // namespace kvstore diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index f1d97a08d7a..0f0d2715e09 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -72,6 +72,7 @@ class MemPartManager final : public PartManager { FRIEND_TEST(NebulaStoreTest, SimpleTest); FRIEND_TEST(NebulaStoreTest, PartsTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest); + FRIEND_TEST(NebulaStoreTest, TransLeaderTest); public: MemPartManager() = default; diff --git a/src/kvstore/plugins/hbase/HBaseStore.cpp b/src/kvstore/plugins/hbase/HBaseStore.cpp index 060c885b3af..532ce6574a6 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.cpp +++ b/src/kvstore/plugins/hbase/HBaseStore.cpp @@ -384,6 +384,12 @@ ResultCode HBaseStore::ingest(GraphSpaceID spaceId) { return ResultCode::ERR_UNSUPPORTED; } +int32_t HBaseStore::allLeader(std::unordered_map>& leaderIds) { + UNUSED(leaderIds); + LOG(FATAL) << "Unimplement"; +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/plugins/hbase/HBaseStore.h b/src/kvstore/plugins/hbase/HBaseStore.h index c59cd345053..ae879df78dc 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.h +++ b/src/kvstore/plugins/hbase/HBaseStore.h @@ -135,6 +135,9 @@ class HBaseStore : public KVStore { ResultCode ingest(GraphSpaceID spaceId) override; + int32_t allLeader(std::unordered_map>& leaderIds) override; + ErrorOr> part(GraphSpaceID, PartitionID) override { return ResultCode::ERR_UNSUPPORTED; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 01663b6d2d2..e2d3e88ce43 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -366,6 +366,55 @@ void RaftPart::addLearner(const HostAddr& addr) { } } +void RaftPart::preProcessTransLeader(const HostAddr& target) { + CHECK(!raftLock_.try_lock()); + LOG(INFO) << idStr_ << "Commit transfer leader to " << target; + switch (role_) { + case Role::FOLLOWER: { + if (target != addr_ && target != HostAddr(0, 0)) { + LOG(INFO) << idStr_ << "I am follower, just wait for the new leader."; + } else { + LOG(INFO) << idStr_ << "I will be the new leader, trigger leader election now!"; + role_ = Role::CANDIDATE; + bgWorkers_->addTask([self = shared_from_this()] { + self->leaderElection(); + }); + } + break; + } + default: { + LOG(INFO) << idStr_ << "My role is " << roleStr(role_) + << ", so do nothing when pre process transfer leader"; + break; + } + } +} + +void RaftPart::commitTransLeader(const HostAddr& target) { + CHECK(!raftLock_.try_lock()); + LOG(INFO) << idStr_ << "Commit transfer leader to " << target; + switch (role_) { + case Role::LEADER: { + if (target != addr_) { + lastMsgRecvDur_.reset(); + role_ = Role::FOLLOWER; + leader_ = HostAddr(0, 0); + LOG(INFO) << idStr_ << "Give up my leadership!"; + } else { + LOG(INFO) << idStr_ << "I am already the leader!"; + } + break; + } + case Role::FOLLOWER: + case Role::CANDIDATE: + case Role::LEARNER: { + CHECK(target != addr_); + LOG(INFO) << idStr_ << "I am " << roleStr(role_) << ", just wait for the new leader!"; + break; + } + } +} + folly::Future RaftPart::appendAsync(ClusterID source, std::string log) { if (source < 0) { @@ -784,6 +833,9 @@ bool RaftPart::needToStartElection() { (lastMsgRecvDur_.elapsedInSec() >= FLAGS_raft_heartbeat_interval_secs || term_ == 0)) { role_ = Role::CANDIDATE; + LOG(INFO) << idStr_ + << "needToStartElection: lastMsgRecvDur " << lastMsgRecvDur_.elapsedInSec() + << ", term_ " << term_; } return role_ == Role::CANDIDATE; @@ -1078,6 +1130,11 @@ void RaftPart::processAskForVoteRequest( }); } + LOG(INFO) << idStr_ << "I was " << roleStr(oldRole) + << ", discover the new leader " << leader_; + bgWorkers_->addTask([self = shared_from_this()] { + self->onDiscoverNewLeader(self->leader_); + }); return; } @@ -1128,10 +1185,6 @@ void RaftPart::processAppendLogRequest( resp.set_error_code(cpp2::ErrorCode::E_NOT_READY); return; } - - TermID oldTerm = term_; - Role oldRole = role_; - // Check leadership cpp2::ErrorCode err = verifyLeader(req, g); if (err != cpp2::ErrorCode::SUCCEEDED) { @@ -1238,14 +1291,6 @@ void RaftPart::processAppendLogRequest( } resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); - - if (oldRole == Role::LEADER) { - // Need to invoke onLostLeadership callback - VLOG(2) << idStr_ << "Was a leader, need to do some clean-up"; - bgWorkers_->addTask([self = shared_from_this(), oldTerm] { - self->onLostLeadership(oldTerm); - }); - } } @@ -1294,6 +1339,8 @@ cpp2::ErrorCode RaftPart::verifyLeader( } } + Role oldRole = role_; + TermID oldTerm = term_; // Ok, no reason to refuse, just follow the leader LOG(INFO) << idStr_ << "The current role is " << roleStr(role_) << ". Will follow the new leader " @@ -1307,7 +1354,17 @@ cpp2::ErrorCode RaftPart::verifyLeader( leader_ = std::make_pair(req.get_leader_ip(), req.get_leader_port()); term_ = proposedTerm_ = req.get_current_term(); + if (oldRole == Role::LEADER) { + // Need to invoke onLostLeadership callback + VLOG(2) << idStr_ << "Was a leader, need to do some clean-up"; + bgWorkers_->addTask([self = shared_from_this(), oldTerm] { + self->onLostLeadership(oldTerm); + }); + } + bgWorkers_->addTask([self = shared_from_this()] { + self->onDiscoverNewLeader(self->leader_); + }); return cpp2::ErrorCode::SUCCEEDED; } diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 8ea36edd367..59faf970acf 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -122,6 +122,10 @@ class RaftPart : public std::enable_shared_from_this { void addLearner(const HostAddr& learner); + void commitTransLeader(const HostAddr& target); + + void preProcessTransLeader(const HostAddr& target); + // Change the partition status to RUNNING. This is called // by the inherited class, when it's ready to serve virtual void start(std::vector&& peers, bool asLearner = false); @@ -205,6 +209,8 @@ class RaftPart : public std::enable_shared_from_this { // a new leader virtual void onElected(TermID term) = 0; + virtual void onDiscoverNewLeader(HostAddr nLeader) = 0; + // The inherited classes need to implement this method to commit // a batch of log messages virtual bool commitLogs(std::unique_ptr iter) = 0; diff --git a/src/kvstore/raftex/test/CMakeLists.txt b/src/kvstore/raftex/test/CMakeLists.txt index 89401af4844..269e3447b16 100644 --- a/src/kvstore/raftex/test/CMakeLists.txt +++ b/src/kvstore/raftex/test/CMakeLists.txt @@ -50,10 +50,17 @@ nebula_add_test( LIBRARIES ${THRIFT_LIBRARIES} wangle gtest ) - nebula_add_test( NAME raft_case_test SOURCES RaftCase.cpp RaftexTestBase.cpp TestShard.cpp OBJECTS ${RAFTEX_TEST_LIBS} LIBRARIES ${THRIFT_LIBRARIES} wangle gtest ) + +nebula_add_test( + NAME leader_transfer_test + SOURCES LeaderTransferTest.cpp RaftexTestBase.cpp TestShard.cpp + OBJECTS ${RAFTEX_TEST_LIBS} + LIBRARIES ${THRIFT_LIBRARIES} wangle gtest +) + diff --git a/src/kvstore/raftex/test/LeaderTransferTest.cpp b/src/kvstore/raftex/test/LeaderTransferTest.cpp new file mode 100644 index 00000000000..0b966a0707f --- /dev/null +++ b/src/kvstore/raftex/test/LeaderTransferTest.cpp @@ -0,0 +1,105 @@ +/* Copyright (c) 2018 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "fs/FileUtils.h" +#include "thread/GenericThreadPool.h" +#include "network/NetworkUtils.h" +#include "kvstore/raftex/RaftexService.h" +#include "kvstore/raftex/test/RaftexTestBase.h" +#include "kvstore/raftex/test/TestShard.h" + +DECLARE_uint32(heartbeat_interval); + + +namespace nebula { +namespace raftex { + + +TEST(LeaderTransferTest, SimpleTest) { + fs::TempDir walRoot("/tmp/leader_transfer_test.simple_test.XXXXXX"); + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + setupRaft(3, walRoot, workers, wals, allHosts, services, copies, leader); + + // Check all hosts agree on the same leader + auto index = checkLeadership(copies, leader); + + auto nLeaderIndex = (index + 1) % 3; + auto f = leader->sendCommandAsync(test::encodeTransferLeader(allHosts[nLeaderIndex])); + f.wait(); + + leader.reset(); + waitUntilLeaderElected(copies, leader); + + checkLeadership(copies, nLeaderIndex, leader); + + // Append 100 logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + appendLogs(0, 99, leader, msgs); + LOG(INFO) << "<===== Finish appending logs"; + + checkConsensus(copies, 0, 99, msgs); + finishRaft(services, copies, workers, leader); +} + +TEST(LeaderTransferTest, ChangeLeaderServalTimesTest) { + fs::TempDir walRoot("/tmp/leader_transfer_test.simple_test.XXXXXX"); + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + setupRaft(3, walRoot, workers, wals, allHosts, services, copies, leader); + + // Check all hosts agree on the same leader + auto nLeaderIndex = checkLeadership(copies, leader); + int32_t times = 0; + while (++times <= 10) { + nLeaderIndex = (nLeaderIndex + 1) % 3; + LOG(INFO) << times << " ===== Let's transfer the leader to " << allHosts[nLeaderIndex]; + auto f = leader->sendCommandAsync(test::encodeTransferLeader(allHosts[nLeaderIndex])); + f.wait(); + leader.reset(); + waitUntilLeaderElected(copies, leader); + checkLeadership(copies, nLeaderIndex, leader); + } + + // Append 100 logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + appendLogs(0, 99, leader, msgs); + LOG(INFO) << "<===== Finish appending logs"; + checkConsensus(copies, 0, 99, msgs); + finishRaft(services, copies, workers, leader); +} + + + +} // namespace raftex +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + return RUN_ALL_TESTS(); +} + + diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index 4146997d0cb..aae7077626c 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -233,16 +233,22 @@ void finishRaft(std::vector>& services, } } -void checkLeadership(std::vector>& copies, - std::shared_ptr& leader) { +int32_t checkLeadership(std::vector>& copies, + std::shared_ptr& leader) { std::lock_guard lock(leaderMutex); - - ASSERT_FALSE(!leader); + CHECK(!!leader); + int32_t leaderIndex = -1; + int i = 0; for (auto& c : copies) { - if (c != nullptr && leader != c && !c->isLearner() && c->isRunning_ == true) { - ASSERT_EQ(leader->address(), c->leader()); + if (c != nullptr && leader != c && !c->isLearner() && c->isRunning_) { + EXPECT_EQ(leader->address(), c->leader()); + } else if (leader == c) { + leaderIndex = i; } + i++; } + CHECK_GE(leaderIndex, 0); + return leaderIndex; } /** diff --git a/src/kvstore/raftex/test/RaftexTestBase.h b/src/kvstore/raftex/test/RaftexTestBase.h index f13a9004983..12c6c4a94b4 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.h +++ b/src/kvstore/raftex/test/RaftexTestBase.h @@ -71,8 +71,8 @@ void finishRaft(std::vector>& services, std::shared_ptr& workers, std::shared_ptr& leader); -void checkLeadership(std::vector>& copies, - std::shared_ptr& leader); +int32_t checkLeadership(std::vector>& copies, + std::shared_ptr& leader); void checkLeadership(std::vector>& copies, size_t index, diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index 5541f778883..c5a8b20d476 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -38,6 +38,21 @@ std::string compareAndSet(const std::string& log) { } } +std::string encodeTransferLeader(const HostAddr& addr) { + std::string str; + CommandType type = CommandType::TRANSFER_LEADER; + str.append(reinterpret_cast(&type), 1); + str.append(reinterpret_cast(&addr), sizeof(HostAddr)); + return str; +} + +HostAddr decodeTransferLeader(const folly::StringPiece& log) { + HostAddr leader; + memcpy(&leader.first, log.begin() + 1, sizeof(leader.first)); + memcpy(&leader.second, log.begin() + 1 + sizeof(leader.first), sizeof(leader.second)); + return leader; +} + TestShard::TestShard(size_t idx, std::shared_ptr svc, PartitionID partId, @@ -89,6 +104,11 @@ bool TestShard::commitLogs(std::unique_ptr iter) { auto log = iter->logMsg(); if (!log.empty()) { switch (static_cast(log[0])) { + case CommandType::TRANSFER_LEADER: { + auto nLeader = decodeTransferLeader(log); + commitTransLeader(nLeader); + break; + } case CommandType::ADD_LEARNER: { break; } diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 70325d5eaf1..09a4265ecbb 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -19,6 +19,7 @@ namespace test { enum class CommandType : int8_t { ADD_LEARNER = 0x01, + TRANSFER_LEADER = 0x02, }; std::string encodeLearner(const HostAddr& addr); @@ -27,6 +28,10 @@ HostAddr decodeLearner(const folly::StringPiece& log); std::string compareAndSet(const std::string& log); +std::string encodeTransferLeader(const HostAddr& addr); + +HostAddr decodeTransferLeader(const folly::StringPiece& log); + class TestShard : public RaftPart { public: TestShard( @@ -57,6 +62,7 @@ class TestShard : public RaftPart { void onLostLeadership(TermID term) override; void onElected(TermID term) override; + void onDiscoverNewLeader(HostAddr) override {} bool commitLogs(std::unique_ptr iter) override; @@ -72,6 +78,12 @@ class TestShard : public RaftPart { LOG(INFO) << idStr_ << "Add learner " << learner; break; } + case CommandType::TRANSFER_LEADER: { + auto nLeader = decodeTransferLeader(log); + preProcessTransLeader(nLeader); + LOG(INFO) << idStr_ << "Preprocess transleader " << nLeader; + break; + } default: { break; } diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 4f2f1ec8d6b..c1e45314ec4 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -384,6 +384,130 @@ TEST(NebulaStoreTest, ThreeCopiesTest) { } } +TEST(NebulaStoreTest, TransLeaderTest) { + fs::TempDir rootPath("/tmp/trans_leader_test.XXXXXX"); + auto initNebulaStore = [](const std::vector& peers, + int32_t index, + const std::string& path) -> std::unique_ptr { + LOG(INFO) << "Start nebula store on " << peers[index]; + auto sIoThreadPool = std::make_shared(4); + auto partMan = std::make_unique(); + for (auto partId = 0; partId < 3; partId++) { + PartMeta pm; + pm.spaceId_ = 0; + pm.partId_ = partId; + pm.peers_ = peers; + partMan->partsMap_[0][partId] = std::move(pm); + } + std::vector paths; + paths.emplace_back(folly::stringPrintf("%s/disk%d", path.c_str(), index)); + KVOptions options; + options.dataPaths_ = std::move(paths); + options.partMan_ = std::move(partMan); + HostAddr local = peers[index]; + return std::make_unique(std::move(options), + sIoThreadPool, + local, + getHandlers()); + }; + + // 3 replicas, 3 partition + int32_t replicas = 3; + IPv4 ip; + CHECK(network::NetworkUtils::ipv4ToInt("127.0.0.1", ip)); + std::vector peers; + for (int32_t i = 0; i < replicas; i++) { + peers.emplace_back(ip, network::NetworkUtils::getAvailablePort()); + } + + std::vector> stores; + for (int i = 0; i < replicas; i++) { + stores.emplace_back(initNebulaStore(peers, i, rootPath.path())); + stores.back()->init(); + } + sleep(FLAGS_raft_heartbeat_interval_secs); + LOG(INFO) << "Waiting for all leaders elected!"; + int from = 0; + while (true) { + bool allLeaderElected = true; + for (int part = from; part < 3; part++) { + auto res = stores[0]->partLeader(0, part); + CHECK(ok(res)); + auto leader = value(std::move(res)); + if (leader == HostAddr(0, 0)) { + allLeaderElected = false; + from = part; + break; + } + LOG(INFO) << "Leader for part " << part << " is " << leader; + } + if (allLeaderElected) { + break; + } + usleep(100000); + } + + auto findStoreIndex = [&] (const HostAddr& addr) { + for (size_t i = 0; i < peers.size(); i++) { + if (peers[i] == addr) { + return i; + } + } + LOG(FATAL) << "Should not reach here!"; + return 0UL; + }; + + LOG(INFO) << "Transfer leader to first copy"; + // all parttition tranfer leaders to first replica + GraphSpaceID spaceId = 0; + for (int i = 0; i < 3; i++) { + PartitionID partId = i; + auto targetAddr = NebulaStore::getRaftAddr(peers[0]); + folly::Baton baton; + LOG(INFO) << "try to trans leader to " << targetAddr.second; + + auto leaderRet = stores[0]->partLeader(spaceId, partId); + CHECK(ok(leaderRet)); + auto leader = value(std::move(leaderRet)); + auto index = findStoreIndex(leader); + auto partRet = stores[index]->part(spaceId, partId); + CHECK(ok(partRet)); + auto part = value(partRet); + part->asyncTransferLeader(targetAddr, [&] (kvstore::ResultCode code) { + EXPECT_EQ(ResultCode::SUCCEEDED, code); + baton.post(); + }); + baton.wait(); + } + sleep(FLAGS_raft_heartbeat_interval_secs); + { + std::unordered_map> leaderIds; + ASSERT_EQ(3, stores[0]->allLeader(leaderIds)); + } + + LOG(INFO) << "Manual leader balance"; + // stores[0] transfer leader to other replica, each one have a leader + // leader of parts would be {0: peers[0], 1: peers[1], 2: peers[2]} + for (int i = 0; i < 3; i++) { + PartitionID partId = i; + auto targetAddr = NebulaStore::getRaftAddr(peers[i]); + folly::Baton baton; + auto ret = stores[0]->part(spaceId, partId); + CHECK(ok(ret)); + auto part = nebula::value(ret); + part->asyncTransferLeader(targetAddr, [&] (kvstore::ResultCode code) { + EXPECT_EQ(ResultCode::SUCCEEDED, code); + baton.post(); + }); + baton.wait(); + } + sleep(FLAGS_raft_heartbeat_interval_secs); + for (int i = 0; i < replicas; i++) { + std::unordered_map> leaderIds; + ASSERT_EQ(1UL, stores[i]->allLeader(leaderIds)); + } +} + } // namespace kvstore } // namespace nebula diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 6c7a2ea7087..20f135611fe 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -41,6 +41,7 @@ nebula_add_library( processors/admin/BalancePlan.cpp processors/admin/BalanceTask.cpp processors/admin/AdminClient.cpp + processors/admin/LeaderBalanceProcessor.cpp processors/configMan/RegConfigProcessor.cpp processors/configMan/GetConfigProcessor.cpp processors/configMan/ListConfigsProcessor.cpp diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index b6febd1e688..443dce054a2 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -33,6 +33,7 @@ #include "meta/processors/admin/HBProcessor.h" #include "meta/processors/usersMan/AuthenticationProcessor.h" #include "meta/processors/admin/BalanceProcessor.h" +#include "meta/processors/admin/LeaderBalanceProcessor.h" #include "meta/processors/configMan/RegConfigProcessor.h" #include "meta/processors/configMan/GetConfigProcessor.h" #include "meta/processors/configMan/SetConfigProcessor.h" @@ -78,7 +79,7 @@ MetaServiceHandler::future_addHosts(const cpp2::AddHostsReq& req) { folly::Future MetaServiceHandler::future_listHosts(const cpp2::ListHostsReq& req) { - auto* processor = ListHostsProcessor::instance(kvstore_); + auto* processor = ListHostsProcessor::instance(kvstore_, adminClient_.get()); RETURN_FUTURE(processor); } @@ -262,6 +263,12 @@ MetaServiceHandler::future_balance(const cpp2::BalanceReq& req) { RETURN_FUTURE(processor); } +folly::Future +MetaServiceHandler::future_leaderBalance(const cpp2::LeaderBalanceReq& req) { + auto* processor = LeaderBalanceProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + folly::Future MetaServiceHandler::future_regConfig(const cpp2::RegConfigReq &req) { auto* processor = RegConfigProcessor::instance(kvstore_); diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index e065d3d3644..68fce405b04 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -11,6 +11,7 @@ #include #include "interface/gen-cpp2/MetaService.h" #include "kvstore/KVStore.h" +#include "meta/processors/admin/AdminClient.h" namespace nebula { namespace meta { @@ -18,7 +19,9 @@ namespace meta { class MetaServiceHandler final : public cpp2::MetaServiceSvIf { public: explicit MetaServiceHandler(kvstore::KVStore* kv, ClusterID clusterId = 0) - : kvstore_(kv), clusterId_(clusterId) {} + : kvstore_(kv), clusterId_(clusterId) { + adminClient_ = std::make_unique(kvstore_); + } /** * Parts distribution related operations. @@ -143,6 +146,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_balance(const cpp2::BalanceReq& req) override; + folly::Future + future_leaderBalance(const cpp2::LeaderBalanceReq& req) override; + folly::Future future_regConfig(const cpp2::RegConfigReq &req) override; @@ -158,6 +164,7 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { private: kvstore::KVStore* kvstore_ = nullptr; ClusterID clusterId_{0}; + std::unique_ptr adminClient_; }; } // namespace meta diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 6219338f55b..bc7cf12be8e 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -346,22 +346,6 @@ std::vector MetaClient::to(const std::vector& return hosts; } -std::vector MetaClient::toHostStatus(const std::vector& tHosts) { - std::vector hosts; - hosts.resize(tHosts.size()); - std::transform(tHosts.begin(), tHosts.end(), hosts.begin(), [](const auto& h) { - switch (h.get_status()) { - case cpp2::HostStatus::ONLINE: - return HostStatus(HostAddr(h.hostAddr.get_ip(), h.hostAddr.get_port()), "online"); - case cpp2::HostStatus::OFFLINE: - return HostStatus(HostAddr(h.hostAddr.get_ip(), h.hostAddr.get_port()), "offline"); - default: - return HostStatus(HostAddr(h.hostAddr.get_ip(), h.hostAddr.get_port()), "unknown"); - } - }); - return hosts; -} - std::vector MetaClient::toSpaceIdName(const std::vector& tIdNames) { std::vector idNames; idNames.resize(tIdNames.size()); @@ -567,14 +551,14 @@ folly::Future> MetaClient::addHosts(const std::vector& return future; } -folly::Future>> MetaClient::listHosts() { +folly::Future>> MetaClient::listHosts() { cpp2::ListHostsReq req; - folly::Promise>> promise; + folly::Promise>> promise; auto future = promise.getFuture(); getResponse(std::move(req), [] (auto client, auto request) { return client->future_listHosts(request); }, [this] (cpp2::ListHostsResp&& resp) -> decltype(auto) { - return this->toHostStatus(resp.hosts); + return resp.hosts; }, std::move(promise)); return future; } @@ -1144,6 +1128,18 @@ folly::Future> MetaClient::balance() { return future; } +folly::Future> MetaClient::balanceLeader() { + cpp2::LeaderBalanceReq req; + folly::Promise> promise; + auto future = promise.getFuture(); + getResponse(std::move(req), [] (auto client, auto request) { + return client->future_leaderBalance(request); + }, [] (cpp2::ExecResp&& resp) -> bool { + return resp.code == cpp2::ErrorCode::SUCCEEDED; + }, std::move(promise), true); + return future; +} + folly::Future> MetaClient::regConfig(const std::vector& items) { cpp2::RegConfigReq req; diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 08c41b0f317..84078f3bcde 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -135,7 +135,7 @@ class MetaClient { folly::Future> addHosts(const std::vector& hosts); - folly::Future>> + folly::Future>> listHosts(); folly::Future> @@ -207,6 +207,8 @@ class MetaClient { folly::Future> balance(); + folly::Future> balanceLeader(); + // Operations for config folly::Future> regConfig(const std::vector& items); @@ -314,8 +316,6 @@ class MetaClient { std::vector to(const std::vector& hosts); - std::vector toHostStatus(const std::vector& thosts); - std::vector toSpaceIdName(const std::vector& tIdNames); ConfigItem toConfigItem(const cpp2::ConfigItem& item); diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index b507d639433..f4299300d01 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -7,6 +7,7 @@ #include "meta/processors/admin/AdminClient.h" #include "meta/MetaServiceUtils.h" #include "meta/processors/Common.h" +#include "meta/ActiveHostsMan.h" DEFINE_int32(max_retry_times_admin_op, 3, "max retry times for admin request!"); @@ -192,18 +193,22 @@ folly::Future AdminClient::getResponse( folly::Promise pro; auto f = pro.getFuture(); auto* evb = ioThreadPool_->getEventBase(); - auto client = clientsMan_->client(host, evb); - remoteFunc(client, std::move(req)) - .then(evb, [p = std::move(pro), - respGen] (folly::Try&& t) mutable { - // exception occurred during RPC - if (t.hasException()) { - p.setValue(Status::Error(folly::stringPrintf("RPC failure in MetaClient: %s", - t.exception().what().c_str()))); - return; - } - auto&& resp = std::move(t).value(); - p.setValue(respGen(std::move(resp))); + folly::via(evb, [evb, pro = std::move(pro), host, req = std::move(req), + remoteFunc = std::move(remoteFunc), respGen = std::move(respGen), + this] () mutable { + auto client = clientsMan_->client(host, evb); + remoteFunc(client, std::move(req)) + .then(evb, [p = std::move(pro), respGen = std::move(respGen), + this] (folly::Try&& t) mutable { + // exception occurred during RPC + if (t.hasException()) { + p.setValue(Status::Error(folly::stringPrintf("RPC failure in AdminClient: %s", + t.exception().what().c_str()))); + return; + } + auto&& resp = std::move(t).value(); + p.setValue(respGen(std::move(resp))); + }); }); return f; } @@ -220,87 +225,87 @@ void AdminClient::getResponse( auto* evb = ioThreadPool_->getEventBase(); CHECK_GE(index, 0); CHECK_LT(index, hosts.size()); - auto client = clientsMan_->client(hosts[index], evb); - remoteFunc(client, req) - .then(evb, [p = std::move(pro), - hosts = std::move(hosts), - index, - req = std::move(req), - remoteFunc = std::move(remoteFunc), - retry, - retryLimit, - this] (folly::Try&& t) mutable { - // exception occurred during RPC - if (t.hasException()) { - if (retry < retryLimit) { - LOG(INFO) << "Rpc failure to " << hosts[index] - << ", retry " << retry - << ", limit " << retryLimit; - getResponse(std::move(hosts), - index + 1, - std::move(req), - remoteFunc, - retry + 1, - std::move(p), - retryLimit); - return; - } - p.setValue(Status::Error(folly::stringPrintf("RPC failure in MetaClient: %s", - t.exception().what().c_str()))); - return; - } - auto resp = std::move(t).value(); - switch (resp.get_code()) { - case storage::cpp2::ErrorCode::SUCCEEDED: { - p.setValue(Status::OK()); - return; - } - case storage::cpp2::ErrorCode::E_LEADER_CHANGED: { + folly::via(evb, [evb, hosts = std::move(hosts), index, req = std::move(req), + remoteFunc = std::move(remoteFunc), retry, pro = std::move(pro), + retryLimit, this] () mutable { + auto client = clientsMan_->client(hosts[index], evb); + remoteFunc(client, req) + .then(evb, [p = std::move(pro), hosts = std::move(hosts), index, req = std::move(req), + remoteFunc = std::move(remoteFunc), retry, retryLimit, + this] (folly::Try&& t) mutable { + // exception occurred during RPC + if (t.hasException()) { if (retry < retryLimit) { - HostAddr leader(resp.get_leader().get_ip(), resp.get_leader().get_port()); - int32_t leaderIndex = 0; - for (auto& h : hosts) { - if (h == leader) { - break; - } - leaderIndex++; - } - LOG(INFO) << "Return leder change from " << hosts[index] - << ", new leader is " << leader + LOG(INFO) << "Rpc failure to " << hosts[index] << ", retry " << retry << ", limit " << retryLimit; getResponse(std::move(hosts), - leaderIndex, + index + 1, std::move(req), - std::move(remoteFunc), + remoteFunc, retry + 1, std::move(p), retryLimit); return; } - p.setValue(Status::Error("Leader changed!")); + p.setValue(Status::Error(folly::stringPrintf("RPC failure in AdminClient: %s", + t.exception().what().c_str()))); return; } - default: { - if (retry < retryLimit) { - LOG(INFO) << "Unknown code " << static_cast(resp.get_code()) - << " from " << hosts[index] - << ", retry " << retry - << ", limit " << retryLimit; - getResponse(std::move(hosts), - index + 1, - std::move(req), - std::move(remoteFunc), - retry + 1, - std::move(p), - retryLimit); + auto resp = std::move(t).value(); + switch (resp.get_code()) { + case storage::cpp2::ErrorCode::SUCCEEDED: { + p.setValue(Status::OK()); + return; + } + case storage::cpp2::ErrorCode::E_LEADER_CHANGED: { + if (retry < retryLimit) { + HostAddr leader(resp.get_leader().get_ip(), resp.get_leader().get_port()); + int32_t leaderIndex = 0; + for (auto& h : hosts) { + if (h == leader) { + break; + } + leaderIndex++; + } + LOG(INFO) << "Return leder change from " << hosts[index] + << ", new leader is " << leader + << ", retry " << retry + << ", limit " << retryLimit; + getResponse(std::move(hosts), + leaderIndex, + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); + return; + } + p.setValue(Status::Error("Leader changed!")); + return; + } + default: { + if (retry < retryLimit) { + LOG(INFO) << "Unknown code " << static_cast(resp.get_code()) + << " from " << hosts[index] + << ", retry " << retry + << ", limit " << retryLimit; + getResponse(std::move(hosts), + index + 1, + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); + return; + } + p.setValue(Status::Error("Unknown code %d", + static_cast(resp.get_code()))); return; } - p.setValue(Status::Error("Unknown code %d", static_cast(resp.get_code()))); - return; } - } - }); + }); // then + }); // via } nebula::cpp2::HostAddr AdminClient::to(const HostAddr& addr) { @@ -334,5 +339,52 @@ StatusOr> AdminClient::getPeers(GraphSpaceID spaceId, Part return Status::Error("Get Failed"); } +folly::Future AdminClient::getLeaderDist(HostLeaderMap* result) { + if (injector_) { + return injector_->getLeaderDist(result); + } + folly::Promise promise; + auto future = promise.getFuture(); + auto allHosts = ActiveHostsMan::getActiveHosts(kv_); + + auto getLeader = [result, this] (const HostAddr& host) { + folly::Promise pro; + auto f = pro.getFuture(); + auto* evb = ioThreadPool_->getEventBase(); + folly::via(evb, [pro = std::move(pro), host, evb, result, this] () mutable { + storage::cpp2::GetLeaderReq req; + auto client = clientsMan_->client(host, evb); + client->future_getLeaderPart(std::move(req)) + .then(evb, [p = std::move(pro), host, + result] (folly::Try&& t) mutable { + if (t.hasException()) { + LOG(ERROR) << folly::stringPrintf("RPC failure in AdminClient: %s", + t.exception().what().c_str()); + p.setValue(Status::Error("RPC failure in AdminClient")); + return; + } + auto&& resp = std::move(t).value(); + (*result)[host] = std::move(resp.get_leader_parts()); + p.setValue(Status::OK()); + }); + }); + return f; + }; + + std::vector> hostFutures; + for (const auto& h : allHosts) { + auto fut = getLeader(h); + hostFutures.emplace_back(std::move(fut)); + } + + folly::collectAll(hostFutures) + .then([p = std::move(promise)] (std::vector>&& tries) mutable { + UNUSED(tries); + p.setValue(Status::OK()); + }); + + return future; +} + } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index 5952212ef67..7eca7d0de50 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -17,6 +17,10 @@ namespace nebula { namespace meta { +using HostLeaderMap = std::unordered_map>>; + class FaultInjector { public: virtual ~FaultInjector() = default; @@ -27,6 +31,7 @@ class FaultInjector { virtual folly::Future memberChange() = 0; virtual folly::Future updateMeta() = 0; virtual folly::Future removePart() = 0; + virtual folly::Future getLeaderDist(HostLeaderMap* hostLeaderMap) = 0; }; static const HostAddr kRandomPeer(0, 0); @@ -74,6 +79,8 @@ class AdminClient { PartitionID partId, const HostAddr& host); + folly::Future getLeaderDist(HostLeaderMap* result); + FaultInjector* faultInjector() { return injector_.get(); } diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index 6153c5fde48..924087d7e8c 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -10,6 +10,10 @@ #include "meta/processors/Common.h" #include "meta/ActiveHostsMan.h" #include "meta/MetaServiceUtils.h" +#include "network/NetworkUtils.h" + +DEFINE_double(leader_balance_deviation, 0.05, "after leader balance, leader count should in range " + "[avg * (1 - deviation), avg * (1 + deviation)]"); namespace nebula { namespace meta { @@ -76,24 +80,31 @@ bool Balancer::recovery() { return true; } +bool Balancer::getAllSpaces(std::vector& spaces, kvstore::ResultCode& retCode) { + // Get all spaces + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + auto prefix = MetaServiceUtils::spacePrefix(); + std::unique_ptr iter; + auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + running_ = false; + retCode = ret; + return false; + } + while (iter->valid()) { + auto spaceId = MetaServiceUtils::spaceId(iter->key()); + spaces.push_back(spaceId); + iter->next(); + } + return true; +} + Status Balancer::buildBalancePlan() { CHECK(!plan_) << "plan should be nullptr now"; std::vector spaces; - { - // Get all spaces - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); - auto prefix = MetaServiceUtils::spacePrefix(); - std::unique_ptr iter; - auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (ret != kvstore::ResultCode::SUCCEEDED) { - running_ = false; - return Status::Error("Can't access kvstore, ret = %d", static_cast(ret)); - } - while (iter->valid()) { - auto spaceId = MetaServiceUtils::spaceId(iter->key()); - spaces.push_back(spaceId); - iter->next(); - } + kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED; + if (!getAllSpaces(spaces, ret)) { + return Status::Error("Can't access kvstore, ret = %d", static_cast(ret)); } plan_ = std::make_unique(time::WallClock::fastNowInSec(), kv_, client_.get()); for (auto spaceId : spaces) { @@ -256,6 +267,7 @@ void Balancer::getHostParts(GraphSpaceID spaceId, iter->next(); totalParts++; } + auto key = MetaServiceUtils::spaceKey(spaceId); std::string value; auto code = kv_->get(kDefaultSpaceId, kDefaultPartId, key, &value); @@ -313,6 +325,248 @@ StatusOr Balancer::hostWithMinimalParts( return Status::Error("No host is suitable for %d", partId); } +cpp2::ErrorCode Balancer::leaderBalance() { + folly::Promise promise; + auto future = promise.getFuture(); + + std::vector spaces; + kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED; + if (!getAllSpaces(spaces, ret)) { + LOG(ERROR) << "Can't access kvstore, ret = d" + << static_cast(ret); + return cpp2::ErrorCode::E_STORE_FAILURE; + } + + bool expected = false; + if (inLeaderBalance_.compare_exchange_strong(expected, true)) { + hostLeaderMap_.reset(new HostLeaderMap); + auto status = client_->getLeaderDist(hostLeaderMap_.get()).get(); + + if (!status.ok()) { + inLeaderBalance_ = false; + return cpp2::ErrorCode::E_RPC_FAILURE; + } + + LeaderBalancePlan plan; + for (const auto& space : spaces) { + buildLeaderBalancePlan(hostLeaderMap_.get(), space, plan); + simplifyLeaderBalnacePlan(space, plan); + } + std::vector> futures; + for (const auto& task : plan) { + futures.emplace_back(client_->transLeader(std::get<0>(task), std::get<1>(task), + std::move(std::get<2>(task)), + std::move(std::get<3>(task)))); + } + + int32_t failed = 0; + folly::collectAll(futures).then([&](const std::vector>& tries) { + for (const auto& t : tries) { + if (!t.value().ok()) { + ++failed; + } + } + }).wait(); + LOG(INFO) << failed << " partiton failed to transfer leader"; + inLeaderBalance_ = false; + return cpp2::ErrorCode::SUCCEEDED; + } + return cpp2::ErrorCode::E_BALANCER_RUNNING; +} + +std::unordered_map> +Balancer::buildLeaderBalancePlan(HostLeaderMap* hostLeaderMap, GraphSpaceID spaceId, + LeaderBalancePlan& plan, bool useDeviation) { + std::unordered_map> peersMap; + std::unordered_map> leaderHostParts; + size_t leaderParts = 0; + { + // store peers of all paritions in peerMap + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + auto prefix = MetaServiceUtils::partPrefix(spaceId); + std::unique_ptr iter; + auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId; + return leaderHostParts; + } + while (iter->valid()) { + auto key = iter->key(); + PartitionID partId; + memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); + auto thriftPeers = MetaServiceUtils::parsePartVal(iter->val()); + std::vector peers; + peers.resize(thriftPeers.size()); + std::transform(thriftPeers.begin(), thriftPeers.end(), peers.begin(), + [] (const auto& h) { return HostAddr(h.get_ip(), h.get_port()); }); + peersMap[partId] = std::move(peers); + ++leaderParts; + iter->next(); + } + } + + int32_t totalParts = 0; + std::unordered_map> allHostParts; + getHostParts(spaceId, allHostParts, totalParts); + + size_t avgLoad = leaderParts / hostLeaderMap->size(); + size_t minLoad = avgLoad; + size_t maxLoad = avgLoad + 1; + if (useDeviation) { + minLoad = std::ceil(static_cast (leaderParts) / hostLeaderMap->size() * + (1 - FLAGS_leader_balance_deviation)); + maxLoad = std::floor(static_cast (leaderParts) / hostLeaderMap->size() * + (1 + FLAGS_leader_balance_deviation)); + } + LOG(INFO) << "Build leader balance plan, expeceted min load: " << minLoad + << ", max load: " << maxLoad; + + std::unordered_set activeHosts; + for (const auto& host : *hostLeaderMap) { + activeHosts.emplace(host.first); + leaderHostParts[host.first] = std::move((*hostLeaderMap)[host.first][spaceId]); + } + + while (true) { + bool hasUnbalancedHost = false; + int32_t taskCount = 0; + + for (auto& hostEntry : leaderHostParts) { + if (minLoad <= hostEntry.second.size() && hostEntry.second.size() <= maxLoad) { + continue; + } + hasUnbalancedHost = true; + if (hostEntry.second.size() < minLoad) { + // need to acquire leader from other hosts + taskCount += acquireLeaders(allHostParts, leaderHostParts, peersMap, activeHosts, + hostEntry.first, minLoad, plan, spaceId); + } else { + // need to transfer leader to other hosts + taskCount += giveupLeaders(leaderHostParts, peersMap, activeHosts, + hostEntry.first, maxLoad, plan, spaceId); + } + } + + // If every host is balanced or no more task during this loop, then the plan is done + if (!hasUnbalancedHost || taskCount == 0) { + break; + } + } + return leaderHostParts; +} + +int32_t Balancer::acquireLeaders( + std::unordered_map>& allHostParts, + std::unordered_map>& leaderHostParts, + std::unordered_map>& peersMap, + std::unordered_set& activeHosts, + HostAddr host, + size_t minLoad, + LeaderBalancePlan& plan, + GraphSpaceID spaceId) { + // host will loop for the partition which is not leader, and try to acuire the leader + int32_t taskCount = 0; + std::vector diff; + std::set_difference(allHostParts[host].begin(), allHostParts[host].end(), + leaderHostParts[host].begin(), leaderHostParts[host].end(), + std::back_inserter(diff)); + auto& hostLeaders = leaderHostParts[host]; + for (const auto& partId : diff) { + // find the leader of partId + auto peers = peersMap[partId]; + for (const auto& peer : peers) { + if (peer == host || !activeHosts.count(peer)) { + continue; + } + // if peer is the leader of partId and can transfer, then transfer it to host + auto& peerLeaders = leaderHostParts[peer]; + auto it = std::find(peerLeaders.begin(), peerLeaders.end(), partId); + if (it != peerLeaders.end()) { + if (minLoad < peerLeaders.size()) { + peerLeaders.erase(it); + hostLeaders.emplace_back(partId); + plan.emplace_back(spaceId, partId, peer, host); + LOG(INFO) << "plan trans leader: " << spaceId << " " << partId << " from " + << network::NetworkUtils::intToIPv4(peer.first) << ":" + << peer.second << " to " + << network::NetworkUtils::intToIPv4(host.first) + << ":" << host.second; + ++taskCount; + break; + } + } + } + // if host has enough leader, just return + if (minLoad <= hostLeaders.size()) { + break; + } + } + return taskCount; +} + +int32_t Balancer::giveupLeaders( + std::unordered_map>& leaderHostParts, + std::unordered_map>& peersMap, + std::unordered_set& activeHosts, + HostAddr host, + size_t maxLoad, + LeaderBalancePlan& plan, + GraphSpaceID spaceId) { + int taskCount = 0; + auto& hostLeaders = leaderHostParts[host]; + // host will try to transfer the extra leaders to other peers + for (auto it = hostLeaders.begin(); it != hostLeaders.end(); ) { + // find the leader of partId + auto partId = *it; + auto peers = peersMap[partId]; + bool transfered = false; + for (const auto& peer : peers) { + if (host == peer || !activeHosts.count(peer)) { + continue; + } + // If peer can accept this partition leader, than host will transfer to the peer + auto& peerLeaders = leaderHostParts[peer]; + if (peerLeaders.size() < maxLoad) { + it = hostLeaders.erase(it); + peerLeaders.emplace_back(partId); + plan.emplace_back(spaceId, partId, host, peer); + LOG(INFO) << "plan trans leader: " << spaceId << " " << partId << " host " + << network::NetworkUtils::intToIPv4(host.first) << ":" + << host.second << " peer " + << network::NetworkUtils::intToIPv4(peer.first) + << ":" << peer.second; + ++taskCount; + transfered = true; + break; + } + } + if (!transfered) { + ++it; + } + // if host has enough leader, just return + if (hostLeaders.size() <= maxLoad) { + break; + } + } + return taskCount; +} + +void Balancer::simplifyLeaderBalnacePlan(GraphSpaceID spaceId, LeaderBalancePlan& plan) { + // Within a leader balance plan, a partition may be moved several times, but actually + // we only need to transfer the leadership of a partition from the first host to the + // last host, and ignore the intermediate ones + std::unordered_map buckets; + for (auto& task : plan) { + buckets[std::get<1>(task)].emplace_back(task); + } + plan.clear(); + for (const auto& partEntry : buckets) { + plan.emplace_back(spaceId, partEntry.first, + std::get<2>(partEntry.second.front()), + std::get<3>(partEntry.second.back())); + } +} + } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h index 1e906688690..a633db1e2e1 100644 --- a/src/meta/processors/admin/Balancer.h +++ b/src/meta/processors/admin/Balancer.h @@ -18,6 +18,9 @@ namespace nebula { namespace meta { + +using LeaderBalancePlan = std::vector>; + /** There are two interfaces public: * Balance: it will construct a balance plan and invoked it. If last balance plan is not succeeded, it will @@ -41,10 +44,16 @@ class Balancer { FRIEND_TEST(BalanceTest, BalancePartsTest); FRIEND_TEST(BalanceTest, NormalTest); FRIEND_TEST(BalanceTest, RecoveryTest); + FRIEND_TEST(BalanceTest, LeaderBalancePlanTest); + FRIEND_TEST(BalanceTest, SimpleLeaderBalancePlanTest); + FRIEND_TEST(BalanceTest, IntersectHostsLeaderBalancePlanTest); + FRIEND_TEST(BalanceTest, LeaderBalanceTest); + FRIEND_TEST(BalanceTest, ManyHostsLeaderBalancePlanTest); + FRIEND_TEST(BalanceIntegrationTest, LeaderBalanceTest); public: static Balancer* instance(kvstore::KVStore* kv) { - static std::unique_ptr client(new AdminClient()); + static std::unique_ptr client(new AdminClient(kv)); static std::unique_ptr balancer(new Balancer(kv, std::move(client))); return balancer.get(); } @@ -86,6 +95,8 @@ class Balancer { return Status::Error("Unsupport it yet!"); } + cpp2::ErrorCode leaderBalance(); + private: Balancer(kvstore::KVStore* kv, std::unique_ptr client) : kv_(kv) @@ -127,6 +138,31 @@ class Balancer { std::vector> sortedHostsByParts(const std::unordered_map>& hostParts); + bool getAllSpaces(std::vector& spaces, kvstore::ResultCode& retCode); + + std::unordered_map> + buildLeaderBalancePlan(HostLeaderMap* hostLeaderMap, GraphSpaceID spaceId, + LeaderBalancePlan& plan, bool useDeviation = true); + + void simplifyLeaderBalnacePlan(GraphSpaceID spaceId, LeaderBalancePlan& plan); + + int32_t acquireLeaders(std::unordered_map>& allHostParts, + std::unordered_map>& leaderHostParts, + std::unordered_map>& peersMap, + std::unordered_set& activeHosts, + HostAddr to, + size_t maxLoad, + LeaderBalancePlan& plan, + GraphSpaceID spaceId); + + int32_t giveupLeaders(std::unordered_map>& leaderHostParts, + std::unordered_map>& peersMap, + std::unordered_set& activeHosts, + HostAddr from, + size_t maxLoad, + LeaderBalancePlan& plan, + GraphSpaceID spaceId); + private: std::atomic_bool running_{false}; kvstore::KVStore* kv_ = nullptr; @@ -134,6 +170,8 @@ class Balancer { // Current running plan. std::unique_ptr plan_{nullptr}; std::unique_ptr executor_; + std::atomic_bool inLeaderBalance_{false}; + std::unique_ptr hostLeaderMap_; }; } // namespace meta diff --git a/src/meta/processors/admin/LeaderBalanceProcessor.cpp b/src/meta/processors/admin/LeaderBalanceProcessor.cpp new file mode 100644 index 00000000000..5c2076e6c3f --- /dev/null +++ b/src/meta/processors/admin/LeaderBalanceProcessor.cpp @@ -0,0 +1,24 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + + +#include "meta/processors/admin/LeaderBalanceProcessor.h" +#include "meta/processors/admin/Balancer.h" + +namespace nebula { +namespace meta { + +void LeaderBalanceProcessor::process(const cpp2::LeaderBalanceReq& req) { + UNUSED(req); + auto ret = Balancer::instance(kvstore_)->leaderBalance(); + resp_.set_code(ret); + onFinished(); +} + + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/admin/LeaderBalanceProcessor.h b/src/meta/processors/admin/LeaderBalanceProcessor.h new file mode 100644 index 00000000000..97eb4b90f92 --- /dev/null +++ b/src/meta/processors/admin/LeaderBalanceProcessor.h @@ -0,0 +1,32 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_LEADERCOUNTPROCESSOR_H_ +#define META_LEADERCOUNTPROCESSOR_H_ + +#include +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class LeaderBalanceProcessor : public BaseProcessor { +public: + static LeaderBalanceProcessor* instance(kvstore::KVStore* kvstore) { + return new LeaderBalanceProcessor(kvstore); + } + + void process(const cpp2::LeaderBalanceReq& req); + +private: + explicit LeaderBalanceProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_LEADERCOUNTPROCESSOR_H_ diff --git a/src/meta/processors/partsMan/ListHostsProcessor.cpp b/src/meta/processors/partsMan/ListHostsProcessor.cpp index 582143c84b8..10568a08895 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.cpp +++ b/src/meta/processors/partsMan/ListHostsProcessor.cpp @@ -6,6 +6,7 @@ #include "meta/processors/partsMan/ListHostsProcessor.h" #include "meta/ActiveHostsMan.h" +#include "meta/processors/admin/AdminClient.h" DECLARE_int32(expired_threshold_sec); @@ -17,8 +18,6 @@ void ListHostsProcessor::process(const cpp2::ListHostsReq& req) { folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); auto ret = allHostsWithStatus(); if (!ret.ok()) { - LOG(ERROR) << "List Hosts Failed : No hosts"; - resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS); onFinished(); return; } @@ -28,18 +27,21 @@ void ListHostsProcessor::process(const cpp2::ListHostsReq& req) { StatusOr> ListHostsProcessor::allHostsWithStatus() { std::vector hostItems; - const auto& prefix = MetaServiceUtils::hostPrefix(); + + const auto& hostPrefix = MetaServiceUtils::hostPrefix(); std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (ret != kvstore::ResultCode::SUCCEEDED) { - return Status::Error("Can't find any hosts"); + auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, hostPrefix, &iter); + if (kvRet != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "List Hosts Failed: No hosts"; + resp_.set_code(cpp2::ErrorCode::E_NO_HOSTS); + return Status::Error("Can't access kvstore, ret = %d", static_cast(kvRet)); } auto now = time::WallClock::fastNowInSec(); while (iter->valid()) { cpp2::HostItem item; nebula::cpp2::HostAddr host; - auto hostAddrPiece = iter->key().subpiece(prefix.size()); + auto hostAddrPiece = iter->key().subpiece(hostPrefix.size()); memcpy(&host, hostAddrPiece.data(), hostAddrPiece.size()); item.set_hostAddr(host); HostInfo info = HostInfo::decode(iter->val()); @@ -52,6 +54,75 @@ StatusOr> ListHostsProcessor::allHostsWithStatus() { iter->next(); } + // Get all spaces + std::vector spaces; + const auto& spacePrefix = MetaServiceUtils::spacePrefix(); + kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, spacePrefix, &iter); + if (kvRet != kvstore::ResultCode::SUCCEEDED) { + return hostItems; + } + while (iter->valid()) { + auto spaceId = MetaServiceUtils::spaceId(iter->key()); + spaces.push_back(spaceId); + iter->next(); + } + + std::unordered_map>> allParts; + for (const auto& spaceId : spaces) { + std::unordered_map> hostParts; + const auto& partPrefix = MetaServiceUtils::partPrefix(spaceId); + kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); + if (kvRet != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "List Hosts Failed: No partitions"; + resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); + return Status::Error("Cant't find any partitions"); + } + while (iter->valid()) { + auto key = iter->key(); + PartitionID partId; + memcpy(&partId, key.data() + partPrefix.size(), sizeof(PartitionID)); + auto partHosts = MetaServiceUtils::parsePartVal(iter->val()); + for (auto& host : partHosts) { + hostParts[HostAddr(host.ip, host.port)].emplace_back(partId); + } + iter->next(); + } + + for (const auto& hostEntry : hostParts) { + allParts[hostEntry.first][spaceId] = std::move(hostEntry.second); + } + } + + for (const auto& hostEntry : allParts) { + auto hostAddr = toThriftHost(hostEntry.first); + auto it = std::find_if(hostItems.begin(), hostItems.end(), [&](const auto& item) { + return item.get_hostAddr() == hostAddr; + }); + if (it != hostItems.end()) { + it->set_all_parts(std::move(hostEntry.second)); + } + } + + if (adminClient_ == nullptr) { + return hostItems; + } + HostLeaderMap hostLeaderMap; + auto ret = adminClient_->getLeaderDist(&hostLeaderMap).get(); + if (!ret.ok()) { + LOG(ERROR) << "Get leader distribution failed"; + return hostItems; + } + for (auto& hostEntry : hostLeaderMap) { + auto hostAddr = toThriftHost(hostEntry.first); + auto it = std::find_if(hostItems.begin(), hostItems.end(), [&](const auto& item) { + return item.get_hostAddr() == hostAddr; + }); + if (it != hostItems.end()) { + it->set_leader_parts(std::move(hostEntry.second)); + } + } + return hostItems; } diff --git a/src/meta/processors/partsMan/ListHostsProcessor.h b/src/meta/processors/partsMan/ListHostsProcessor.h index 0ed6bf9ecbb..673e346ffb9 100644 --- a/src/meta/processors/partsMan/ListHostsProcessor.h +++ b/src/meta/processors/partsMan/ListHostsProcessor.h @@ -8,26 +8,31 @@ #define META_LISTHOSTSPROCESSOR_H_ #include "meta/processors/BaseProcessor.h" +#include "meta/processors/admin/AdminClient.h" namespace nebula { namespace meta { class ListHostsProcessor : public BaseProcessor { public: - static ListHostsProcessor* instance(kvstore::KVStore* kvstore) { - return new ListHostsProcessor(kvstore); + static ListHostsProcessor* instance(kvstore::KVStore* kvstore, + AdminClient* adminClient = nullptr) { + return new ListHostsProcessor(kvstore, adminClient); } void process(const cpp2::ListHostsReq& req); private: - explicit ListHostsProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} + explicit ListHostsProcessor(kvstore::KVStore* kvstore, AdminClient* adminClient) + : BaseProcessor(kvstore) + , adminClient_(adminClient) {} /** * Get all hosts with online/offline status. * */ StatusOr> allHostsWithStatus(); + + AdminClient* adminClient_; }; } // namespace meta diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp index 251bff8d470..a7dd77afc1b 100644 --- a/src/meta/test/BalanceIntegrationTest.cpp +++ b/src/meta/test/BalanceIntegrationTest.cpp @@ -12,6 +12,10 @@ #include "storage/test/TestUtils.h" #include "fs/TempDir.h" +DECLARE_int32(load_data_interval_secs); +DECLARE_int32(heartbeat_interval_secs); +DECLARE_uint32(raft_heartbeat_interval_secs); + namespace nebula { namespace meta { @@ -22,6 +26,88 @@ TEST(BalanceIntegrationTest, SimpleTest) { LOG(INFO) << "Start storage server on " << sc->port_; } +TEST(BalanceIntegrationTest, LeaderBalanceTest) { + FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; + FLAGS_raft_heartbeat_interval_secs = 1; + fs::TempDir rootPath("/tmp/balance_integration_test.XXXXXX"); + IPv4 localIp; + network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); + const nebula::ClusterID kClusterId = 10; + + uint32_t localMetaPort = network::NetworkUtils::getAvailablePort(); + LOG(INFO) << "Start meta server...."; + std::string metaPath = folly::stringPrintf("%s/meta", rootPath.path()); + auto metaServerContext = meta::TestUtils::mockMetaServer(localMetaPort, metaPath.c_str(), + kClusterId); + localMetaPort = metaServerContext->port_; + + auto adminClient = std::make_unique(metaServerContext->kvStore_.get()); + Balancer balancer(metaServerContext->kvStore_.get(), std::move(adminClient)); + + auto threadPool = std::make_shared(10); + std::vector metaAddr = {HostAddr(localIp, localMetaPort)}; + + LOG(INFO) << "Create meta client..."; + uint32_t tempDataPort = network::NetworkUtils::getAvailablePort(); + HostAddr tempDataAddr(localIp, tempDataPort); + auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, + kClusterId, false); + + auto ret = mClient->addHosts({tempDataAddr}).get(); + ASSERT_TRUE(ret.ok()); + mClient->waitForMetadReady(); + ret = mClient->removeHosts({tempDataAddr}).get(); + ASSERT_TRUE(ret.ok()); + + int partition = 9; + int replica = 3; + std::vector peers; + std::vector storagePorts; + std::vector> metaClients; + + std::vector> serverContexts; + for (int i = 0; i < replica; i++) { + uint32_t storagePort = network::NetworkUtils::getAvailablePort(); + HostAddr storageAddr(localIp, storagePort); + storagePorts.emplace_back(storagePort); + peers.emplace_back(storageAddr); + + ret = mClient->addHosts({storageAddr}).get(); + ASSERT_TRUE(ret.ok()); + VLOG(1) << "The storage server has been added to the meta service"; + + auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, + kClusterId, true); + metaClient->waitForMetadReady(); + metaClients.emplace_back(metaClient); + } + + for (int i = 0; i < replica; i++) { + std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), i); + auto sc = storage::TestUtils::mockStorageServer(metaClients[i].get(), + dataPath.c_str(), + localIp, + storagePorts[i], + true); + serverContexts.emplace_back(std::move(sc)); + } + + ret = mClient->createSpace("storage", partition, replica).get(); + ASSERT_TRUE(ret.ok()); + sleep(FLAGS_load_data_interval_secs + 1); + sleep(FLAGS_raft_heartbeat_interval_secs); + + auto code = balancer.leaderBalance(); + ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); + + sleep(FLAGS_raft_heartbeat_interval_secs); + for (int i = 0; i < replica; i++) { + std::unordered_map> leaderIds; + EXPECT_EQ(3, serverContexts[i]->kvStore_->allLeader(leaderIds)); + } +} + } // namespace meta } // namespace nebula diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 3c9fa601811..363261c62dc 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -14,6 +14,7 @@ DECLARE_uint32(task_concurrency); DECLARE_int32(expired_threshold_sec); +DECLARE_double(leader_balance_deviation); namespace nebula { namespace meta { @@ -65,6 +66,13 @@ class TestFaultInjector : public FaultInjector { return response(6); } + folly::Future getLeaderDist(HostLeaderMap* hostLeaderMap) override { + (*hostLeaderMap)[HostAddr(0, 0)][1] = {1, 2, 3, 4, 5}; + (*hostLeaderMap)[HostAddr(1, 1)][1] = {6, 7, 8}; + (*hostLeaderMap)[HostAddr(2, 2)][1] = {9}; + return response(7); + } + void reset(std::vector sts) { statusArray_ = std::move(sts); } @@ -123,7 +131,7 @@ TEST(BalanceTaskTest, SimpleTest) { } TEST(BalanceTest, BalancePartsTest) { - auto* balancer = Balancer::instance(nullptr); + std::unique_ptr balancer(new Balancer(nullptr, nullptr)); auto dump = [](const std::unordered_map>& hostParts, const std::vector& tasks) { for (auto it = hostParts.begin(); it != hostParts.end(); it++) { @@ -615,6 +623,225 @@ TEST(BalanceTest, RecoveryTest) { } } +void verifyLeaderBalancePlan(std::unordered_map> leaderCount, + size_t minLoad, size_t maxLoad) { + for (const auto& hostEntry : leaderCount) { + EXPECT_GE(hostEntry.second.size(), minLoad); + EXPECT_LE(hostEntry.second.size(), maxLoad); + } +} + +TEST(BalanceTest, SimpleLeaderBalancePlanTest) { + fs::TempDir rootPath("/tmp/SimpleLeaderBalancePlanTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + std::vector hosts = {{0, 0}, {1, 1}, {2, 2}}; + TestUtils::createSomeHosts(kv.get(), hosts); + // 9 partition in space 1, 3 replica, 3 hosts + TestUtils::assembleSpace(kv.get(), 1, 9, 3, 3); + + std::unique_ptr client(new AdminClient(kv.get())); + std::unique_ptr balancer(new Balancer(kv.get(), std::move(client))); + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {1, 2, 3, 4, 5}; + hostLeaderMap[HostAddr(1, 1)][1] = {6, 7, 8}; + hostLeaderMap[HostAddr(2, 2)][1] = {9}; + auto tempMap = hostLeaderMap; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 3, 3); + + // check two plan build are same + LeaderBalancePlan tempPlan; + auto tempLeaderParts = balancer->buildLeaderBalancePlan(&tempMap, 1, tempPlan, false); + verifyLeaderBalancePlan(tempLeaderParts, 3, 3); + EXPECT_EQ(plan.size(), tempPlan.size()); + for (size_t i = 0; i < plan.size(); i++) { + EXPECT_EQ(plan[i], tempPlan[i]); + } + } + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {1, 2, 3, 4}; + hostLeaderMap[HostAddr(1, 1)][1] = {5, 6, 7, 8}; + hostLeaderMap[HostAddr(2, 2)][1] = {9}; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 3, 3); + } + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {}; + hostLeaderMap[HostAddr(1, 1)][1] = {}; + hostLeaderMap[HostAddr(2, 2)][1] = {1, 2, 3, 4, 5, 6, 7, 8, 9}; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 3, 3); + } + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {1, 2, 3}; + hostLeaderMap[HostAddr(1, 1)][1] = {4, 5, 6}; + hostLeaderMap[HostAddr(2, 2)][1] = {7, 8, 9}; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 3, 3); + } +} + +TEST(BalanceTest, IntersectHostsLeaderBalancePlanTest) { + fs::TempDir rootPath("/tmp/IntersectHostsLeaderBalancePlanTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}; + TestUtils::createSomeHosts(kv.get(), hosts); + // 7 partition in space 1, 3 replica, 6 hosts, so not all hosts have intersection parts + TestUtils::assembleSpace(kv.get(), 1, 7, 3, 6); + + std::unique_ptr client(new AdminClient(kv.get())); + std::unique_ptr balancer(new Balancer(kv.get(), std::move(client))); + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {4, 5, 6}; + hostLeaderMap[HostAddr(1, 1)][1] = {}; + hostLeaderMap[HostAddr(2, 2)][1] = {}; + hostLeaderMap[HostAddr(3, 3)][1] = {1, 2, 3, 7}; + hostLeaderMap[HostAddr(4, 4)][1] = {}; + hostLeaderMap[HostAddr(5, 5)][1] = {}; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 1, 2); + } + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {}; + hostLeaderMap[HostAddr(1, 1)][1] = {5, 6, 7}; + hostLeaderMap[HostAddr(2, 2)][1] = {}; + hostLeaderMap[HostAddr(3, 3)][1] = {1, 2}; + hostLeaderMap[HostAddr(4, 4)][1] = {}; + hostLeaderMap[HostAddr(5, 5)][1] = {3, 4}; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 1, 2); + } + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {}; + hostLeaderMap[HostAddr(1, 1)][1] = {1, 5}; + hostLeaderMap[HostAddr(2, 2)][1] = {2, 6}; + hostLeaderMap[HostAddr(3, 3)][1] = {3, 7}; + hostLeaderMap[HostAddr(4, 4)][1] = {4}; + hostLeaderMap[HostAddr(5, 5)][1] = {}; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 1, 2); + } + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {5, 6}; + hostLeaderMap[HostAddr(1, 1)][1] = {1, 7}; + hostLeaderMap[HostAddr(2, 2)][1] = {}; + hostLeaderMap[HostAddr(3, 3)][1] = {}; + hostLeaderMap[HostAddr(4, 4)][1] = {2, 3, 4}; + hostLeaderMap[HostAddr(5, 5)][1] = {}; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 1, 2); + } + { + HostLeaderMap hostLeaderMap; + hostLeaderMap[HostAddr(0, 0)][1] = {6}; + hostLeaderMap[HostAddr(1, 1)][1] = {1, 7}; + hostLeaderMap[HostAddr(2, 2)][1] = {2}; + hostLeaderMap[HostAddr(3, 3)][1] = {3}; + hostLeaderMap[HostAddr(4, 4)][1] = {4}; + hostLeaderMap[HostAddr(5, 5)][1] = {5}; + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan, false); + verifyLeaderBalancePlan(leaderParts, 1, 2); + } +} + +TEST(BalanceTest, ManyHostsLeaderBalancePlanTest) { + fs::TempDir rootPath("/tmp/SimpleLeaderBalancePlanTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + FLAGS_expired_threshold_sec = 600; + + int partCount = 99999; + int replica = 3; + int hostCount = 100; + std::vector hosts; + for (int i = 0; i < hostCount; i++) { + hosts.emplace_back(i, i); + } + TestUtils::createSomeHosts(kv.get(), hosts); + TestUtils::assembleSpace(kv.get(), 1, partCount, replica, hostCount); + + float avgLoad = static_cast(partCount) / hostCount; + int32_t minLoad = std::floor(avgLoad * (1 - FLAGS_leader_balance_deviation)); + int32_t maxLoad = std::ceil(avgLoad * (1 + FLAGS_leader_balance_deviation)); + + std::unique_ptr client(new AdminClient(kv.get())); + std::unique_ptr balancer(new Balancer(kv.get(), std::move(client))); + // chcek several times if they are balanced + for (int count = 0; count < 1; count++) { + HostLeaderMap hostLeaderMap; + // all part will random choose a leader + for (int partId = 1; partId <= partCount; partId++) { + std::vector peers; + size_t idx = partId; + for (int32_t i = 0; i < replica; i++, idx++) { + peers.emplace_back(hosts[idx % hostCount]); + } + ASSERT_EQ(peers.size(), replica); + auto leader = peers[folly::Random::rand32(peers.size())]; + hostLeaderMap[leader][1].emplace_back(partId); + } + + LeaderBalancePlan plan; + auto leaderParts = balancer->buildLeaderBalancePlan(&hostLeaderMap, 1, plan); + verifyLeaderBalancePlan(leaderParts, minLoad, maxLoad); + } +} + +TEST(BalanceTest, LeaderBalanceTest) { + fs::TempDir rootPath("/tmp/LeaderBalanceTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + std::vector hosts = {{0, 0}, {1, 1}, {2, 2}}; + TestUtils::createSomeHosts(kv.get(), hosts); + TestUtils::assembleSpace(kv.get(), 1, 9, 3, 3); + { + cpp2::SpaceProperties properties; + properties.set_space_name("default_space"); + properties.set_partition_num(9); + properties.set_replica_factor(3); + cpp2::CreateSpaceReq req; + req.set_properties(std::move(properties)); + auto* processor = CreateSpaceProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + ASSERT_EQ(1, resp.get_id().get_space_id()); + } + + std::vector sts(8, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + + Balancer balancer(kv.get(), std::move(client)); + auto ret = balancer.leaderBalance(); + ASSERT_EQ(ret, cpp2::ErrorCode::SUCCEEDED); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 20c4f923c3a..fdf5e1bc6d3 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -351,6 +351,7 @@ nebula_add_test( $ $ $ + $ LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index dfbaa8ce8d3..275e22b86b0 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -52,7 +52,9 @@ TEST(MetaClientTest, InterfacesTest) { auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); for (auto i = 0u; i < hosts.size(); i++) { - ASSERT_EQ(hosts[i], ret.value()[i].first); + auto tHost = ret.value()[i].hostAddr; + auto hostAddr = HostAddr(tHost.ip, tHost.port); + ASSERT_EQ(hosts[i], hostAddr); } } { @@ -421,7 +423,9 @@ TEST(MetaClientTest, DiffTest) { auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); for (auto i = 0u; i < hosts.size(); i++) { - ASSERT_EQ(hosts[i], ret.value()[i].first); + auto tHost = ret.value()[i].hostAddr; + auto hostAddr = HostAddr(tHost.ip, tHost.port); + ASSERT_EQ(hosts[i], hostAddr); } } { @@ -476,7 +480,9 @@ TEST(MetaClientTest, HeartbeatTest) { auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); for (auto i = 0u; i < hosts.size(); i++) { - ASSERT_EQ(hosts[i], ret.value()[i].first); + auto tHost = ret.value()[i].hostAddr; + auto hostAddr = HostAddr(tHost.ip, tHost.port); + ASSERT_EQ(hosts[i], hostAddr); } } sleep(FLAGS_heartbeat_interval_secs + 1); diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 4fd585d4056..04e185c8100 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -117,13 +117,29 @@ class TestUtils { return hosts.size(); } - static bool assembleSpace(kvstore::KVStore* kv, GraphSpaceID id, int32_t partitionNum) { + static bool assembleSpace(kvstore::KVStore* kv, GraphSpaceID id, int32_t partitionNum, + int32_t replica = 1, int32_t totalHost = 1) { + // mock the part distribution like create space bool ret = false; + cpp2::SpaceProperties properties; + properties.set_space_name("test_space"); + properties.set_partition_num(partitionNum); + properties.set_replica_factor(replica); + auto spaceVal = MetaServiceUtils::spaceVal(properties); std::vector data; - data.emplace_back(MetaServiceUtils::spaceKey(id), "test_space"); + data.emplace_back(MetaServiceUtils::spaceKey(id), MetaServiceUtils::spaceVal(properties)); + + std::vector allHosts; + for (int i = 0; i < totalHost; i++) { + allHosts.emplace_back(apache::thrift::FragileConstructor::FRAGILE, i, i); + } + for (auto partId = 1; partId <= partitionNum; partId++) { std::vector hosts; - hosts.emplace_back(apache::thrift::FragileConstructor::FRAGILE, 0, 0); + size_t idx = partId; + for (int32_t i = 0; i < replica; i++, idx++) { + hosts.emplace_back(allHosts[idx % totalHost]); + } data.emplace_back(MetaServiceUtils::partKey(id, partId), MetaServiceUtils::partVal(hosts)); } diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index 41a86961fd1..dd10040ca43 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -145,4 +145,14 @@ std::string ConfigSentence::toString() const { return "Unknown"; } +std::string BalanceSentence::toString() const { + switch (subType_) { + case SubType::kLeader: + return std::string("BALANCE LEADER"); + default: + FLOG_FATAL("Type illegal"); + } + return "Unknown"; +} + } // namespace nebula diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index e742474fac3..689b608df3e 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -354,6 +354,29 @@ class ConfigSentence final : public Sentence { std::unique_ptr configItem_; }; +class BalanceSentence final : public Sentence { +public: + enum class SubType : uint32_t { + kUnknown, + kLeader, + }; + + // TODO: add more subtype for balance + explicit BalanceSentence(SubType subType) { + kind_ = Kind::kBalance; + subType_ = std::move(subType); + } + + std::string toString() const override; + + SubType subType() const { + return subType_; + } + +private: + SubType subType_{SubType::kUnknown}; +}; + } // namespace nebula #endif // PARSER_ADMINSENTENCES_H_ diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index 7aa22cfda10..fe118c965f7 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -56,6 +56,7 @@ class Sentence { kConfig, kFetchVertices, kFetchEdges, + kBalance, }; Kind kind() const { diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 6d9d8efd9c0..5d3947cb19a 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -104,6 +104,7 @@ class GraphScanner; %token KW_ORDER KW_ASC %token KW_FETCH KW_PROP %token KW_DISTINCT KW_ALL +%token KW_BALANCE KW_LEADER /* symbols */ %token L_PAREN R_PAREN L_BRACKET R_BRACKET L_BRACE R_BRACE COMMA %token PIPE OR AND LT LE GT GE EQ NE PLUS MINUS MUL DIV MOD NOT NEG ASSIGN @@ -195,7 +196,7 @@ class GraphScanner; %type create_user_sentence alter_user_sentence drop_user_sentence change_password_sentence %type grant_sentence revoke_sentence %type download_sentence -%type set_config_sentence get_config_sentence +%type set_config_sentence get_config_sentence balance_sentence %type sentence %type sentences @@ -1483,6 +1484,12 @@ set_config_sentence } ; +balance_sentence + : KW_BALANCE KW_LEADER { + $$ = new BalanceSentence(BalanceSentence::SubType::kLeader); + } + ; + mutate_sentence : insert_vertex_sentence { $$ = $1; } | insert_edge_sentence { $$ = $1; } @@ -1523,6 +1530,7 @@ maintain_sentence | revoke_sentence { $$ = $1; } | get_config_sentence { $$ = $1; } | set_config_sentence { $$ = $1; } + | balance_sentence { $$ = $1; } ; sentence diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index 372b669ae8f..eb25f3527bd 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -113,6 +113,8 @@ STORAGE ([Ss][Tt][Oo][Rr][Aa][Gg][Ee]) FETCH ([Ff][Ee][Tt][Cc][Hh]) PROP ([Pp][Rr][Oo][Pp]) ALL ([Aa][Ll][Ll]) +BALANCE ([Bb][Aa][Ll][Aa][Nn][Cc][Ee]) +LEADER ([Ll][Ee][Aa][Dd][Ee][Rr]) LABEL ([a-zA-Z][_a-zA-Z0-9]*) DEC ([0-9]) @@ -215,6 +217,8 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) {FETCH} { return TokenType::KW_FETCH; } {PROP} { return TokenType::KW_PROP; } {ALL} { return TokenType::KW_ALL; } +{BALANCE} { return TokenType::KW_BALANCE; } +{LEADER} { return TokenType::KW_LEADER; } "." { return TokenType::DOT; } "," { return TokenType::COMMA; } diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index c796ca447e1..efbf5f6a006 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -1146,4 +1146,13 @@ TEST(Parser, ConfigOperation) { } } +TEST(Parser, BalanceOperation) { + { + GQLParser parser; + std::string query = "BALANCE LEADER"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } +} + } // namespace nebula diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index 1110e684a7f..36f49c6ee65 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -360,6 +360,12 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("Variables", TokenType::KW_VARIABLES), CHECK_SEMANTIC_TYPE("ALL", TokenType::KW_ALL), CHECK_SEMANTIC_TYPE("all", TokenType::KW_ALL), + CHECK_SEMANTIC_TYPE("BALANCE", TokenType::KW_BALANCE), + CHECK_SEMANTIC_TYPE("Balance", TokenType::KW_BALANCE), + CHECK_SEMANTIC_TYPE("balance", TokenType::KW_BALANCE), + CHECK_SEMANTIC_TYPE("LEADER", TokenType::KW_LEADER), + CHECK_SEMANTIC_TYPE("Leader", TokenType::KW_LEADER), + CHECK_SEMANTIC_TYPE("leader", TokenType::KW_LEADER), CHECK_SEMANTIC_TYPE("_type", TokenType::TYPE_PROP), CHECK_SEMANTIC_TYPE("_id", TokenType::ID_PROP), diff --git a/src/storage/AdminProcessor.h b/src/storage/AdminProcessor.h index f40f7d39d63..54fac66269c 100644 --- a/src/storage/AdminProcessor.h +++ b/src/storage/AdminProcessor.h @@ -9,6 +9,8 @@ #include "base/Base.h" #include "storage/BaseProcessor.h" +#include "kvstore/NebulaStore.h" +#include "kvstore/Part.h" namespace nebula { namespace storage { @@ -20,7 +22,39 @@ class TransLeaderProcessor : public BaseProcessor { } void process(const cpp2::TransLeaderReq& req) { - UNUSED(req); + CHECK_NOTNULL(kvstore_); + auto spaceId = req.get_space_id(); + auto partId = req.get_part_id(); + auto ret = kvstore_->part(spaceId, partId); + if (!ok(ret)) { + resp_.set_code(to(error(ret))); + promise_.setValue(std::move(resp_)); + delete this; + return; + } + auto part = nebula::value(ret); + auto host = kvstore::NebulaStore::getRaftAddr(HostAddr(req.get_new_leader().get_ip(), + req.get_new_leader().get_port())); + part->asyncTransferLeader(std::move(host), + [this, spaceId, partId] (kvstore::ResultCode code) { + auto leaderRet = kvstore_->partLeader(spaceId, partId); + CHECK(ok(leaderRet)); + if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) { + auto addr = value(std::move(leaderRet)); + if (addr == HostAddr(0, 0)) { + // No leader is elected, just return ok + code = kvstore::ResultCode::SUCCEEDED; + } else { + nebula::cpp2::HostAddr leader; + leader.set_ip(addr.first); + leader.set_port(addr.second); + resp_.set_leader(std::move(leader)); + } + } + resp_.set_code(to(code)); + promise_.setValue(std::move(resp_)); + delete this; + }); } private: @@ -102,6 +136,29 @@ class WaitingForCatchUpDataProcessor : public BaseProcessor explicit WaitingForCatchUpDataProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore, nullptr) {} }; + +class GetLeaderProcessor : public BaseProcessor { +public: + static GetLeaderProcessor* instance(kvstore::KVStore* kvstore) { + return new GetLeaderProcessor(kvstore); + } + + void process(const cpp2::GetLeaderReq& req) { + UNUSED(req); + CHECK_NOTNULL(kvstore_); + std::unordered_map> leaderIds; + kvstore_->allLeader(leaderIds); + resp_.set_code(to(kvstore::ResultCode::SUCCEEDED)); + resp_.set_leader_parts(std::move(leaderIds)); + promise_.setValue(std::move(resp_)); + delete this; + } + +private: + explicit GetLeaderProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore, nullptr) {} +}; + } // namespace storage } // namespace nebula diff --git a/src/storage/StorageServiceHandler.cpp b/src/storage/StorageServiceHandler.cpp index 8236704f318..07c760d5d89 100644 --- a/src/storage/StorageServiceHandler.cpp +++ b/src/storage/StorageServiceHandler.cpp @@ -114,6 +114,12 @@ StorageServiceHandler::future_memberChange(const cpp2::MemberChangeReq& req) { RETURN_FUTURE(processor); } +folly::Future +StorageServiceHandler::future_getLeaderPart(const cpp2::GetLeaderReq& req) { + auto* processor = GetLeaderProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + } // namespace storage } // namespace nebula diff --git a/src/storage/StorageServiceHandler.h b/src/storage/StorageServiceHandler.h index 1469b243a69..539ca495c6a 100644 --- a/src/storage/StorageServiceHandler.h +++ b/src/storage/StorageServiceHandler.h @@ -68,6 +68,9 @@ class StorageServiceHandler final : public cpp2::StorageServiceSvIf { folly::Future future_memberChange(const cpp2::MemberChangeReq& req) override; + folly::Future + future_getLeaderPart(const cpp2::GetLeaderReq& req) override; + private: kvstore::KVStore* kvstore_ = nullptr; meta::SchemaManager* schemaMan_;