Skip to content

Commit

Permalink
Leader balance (vesoft-inc#731)
Browse files Browse the repository at this point in the history
Basic leader balance for storage, keep trying to trans leader from host with most leader to host with least leader, until all balanced.

Modify/Add two command in console:

1. "show host" will also display total leader count and leader count in each space
2. add command "balance leader", which will try to balance leader between different hosts
  • Loading branch information
critical27 authored and dangleptr committed Aug 28, 2019
1 parent 3b2f3cf commit c7c1818
Show file tree
Hide file tree
Showing 57 changed files with 1,739 additions and 175 deletions.
64 changes: 64 additions & 0 deletions src/graph/BalanceExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "graph/BalanceExecutor.h"

namespace nebula {
namespace graph {

BalanceExecutor::BalanceExecutor(Sentence *sentence,
ExecutionContext *ectx) : Executor(ectx) {
sentence_ = static_cast<BalanceSentence*>(sentence);
}

Status BalanceExecutor::prepare() {
return Status::OK();
}

void BalanceExecutor::execute() {
auto showType = sentence_->subType();
switch (showType) {
case BalanceSentence::SubType::kLeader:
balanceLeader();
break;
case BalanceSentence::SubType::kUnknown:
onError_(Status::Error("Type unknown"));
break;
}
}

void BalanceExecutor::balanceLeader() {
auto future = ectx()->getMetaClient()->balanceLeader();
auto *runner = ectx()->rctx()->runner();

auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
return;
}
auto ret = std::move(resp).value();
if (!ret) {
DCHECK(onError_);
onError_(Status::Error("Balance leader failed"));
return;
}
DCHECK(onFinish_);
onFinish_();
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error("Internal error"));
return;
};

std::move(future).via(runner).thenValue(cb).thenError(error);
}

} // namespace graph
} // namespace nebula
38 changes: 38 additions & 0 deletions src/graph/BalanceExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef GRAPH_BALANCEEXECUTOR_H_
#define GRAPH_BALANCEEXECUTOR_H_

#include "base/Base.h"
#include "graph/Executor.h"

namespace nebula {
namespace graph {

class BalanceExecutor final : public Executor {
public:
BalanceExecutor(Sentence *sentence, ExecutionContext *ectx);

const char* name() const override {
return "BalanceExecutor";
}

Status MUST_USE_RESULT prepare() override;

void execute() override;

void balanceLeader();

private:
BalanceSentence *sentence_{nullptr};
std::unique_ptr<cpp2::ExecutionResponse> resp_;
};

} // namespace graph
} // namespace nebula

#endif // GRAPH_BALANCEEXECUTOR_H_
1 change: 1 addition & 0 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ add_library(
OrderByExecutor.cpp
IngestExecutor.cpp
ConfigExecutor.cpp
BalanceExecutor.cpp
SchemaHelper.cpp
FetchVerticesExecutor.cpp
FetchEdgesExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "graph/SetExecutor.h"
#include "graph/FindExecutor.h"
#include "graph/MatchExecutor.h"
#include "graph/BalanceExecutor.h"

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -139,6 +140,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) {
case Sentence::Kind::kFind:
executor = std::make_unique<FindExecutor>(sentence, ectx());
break;
case Sentence::Kind::kBalance:
executor = std::make_unique<BalanceExecutor>(sentence, ectx());
break;
case Sentence::Kind::kUnknown:
LOG(FATAL) << "Sentence kind unknown";
break;
Expand Down
49 changes: 42 additions & 7 deletions src/graph/ShowExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,53 @@ void ShowExecutor::showHosts() {
return;
}

auto retShowHosts = std::move(resp).value();
auto hostItems = std::move(resp).value();
std::vector<cpp2::RowValue> rows;
std::vector<std::string> header{"Ip", "Port", "Status"};
std::vector<std::string> header{"Ip", "Port", "Status", "Leader count",
"Leader distribution", "Partition distribution"};
resp_ = std::make_unique<cpp2::ExecutionResponse>();
resp_->set_column_names(std::move(header));

for (auto &status : retShowHosts) {
for (auto& item : hostItems) {
std::vector<cpp2::ColumnValue> row;
row.resize(3);
row[0].set_str(NetworkUtils::ipFromHostAddr(status.first));
row[1].set_str(folly::to<std::string>(NetworkUtils::portFromHostAddr(status.first)));
row[2].set_str(status.second);
row.resize(6);
auto hostAddr = HostAddr(item.hostAddr.ip, item.hostAddr.port);
row[0].set_str(NetworkUtils::ipFromHostAddr(hostAddr));
row[1].set_str(folly::to<std::string>(NetworkUtils::portFromHostAddr(hostAddr)));
switch (item.get_status()) {
case meta::cpp2::HostStatus::ONLINE:
row[2].set_str("online");
break;
case meta::cpp2::HostStatus::OFFLINE:
case meta::cpp2::HostStatus::UNKNOWN:
row[2].set_str("offline");
break;
}

int32_t leaderCount = 0;
std::string leaders;
for (auto& spaceEntry : item.get_leader_parts()) {
leaderCount += spaceEntry.second.size();
leaders += "space " + folly::to<std::string>(spaceEntry.first) + ": " +
folly::to<std::string>(spaceEntry.second.size()) + ", ";
}
if (!leaders.empty()) {
leaders.resize(leaders.size() - 2);
}

row[3].set_integer(leaderCount);
row[4].set_str(leaders);

std::string parts;
for (auto& spaceEntry : item.get_all_parts()) {
parts += "space " + folly::to<std::string>(spaceEntry.first) + ": " +
folly::to<std::string>(spaceEntry.second.size()) + ", ";
}
if (!parts.empty()) {
parts.resize(parts.size() - 2);
}
row[5].set_str(parts);

rows.emplace_back();
rows.back().set_columns(std::move(row));
}
Expand Down
13 changes: 8 additions & 5 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ TEST_F(SchemaTest, metaCommunication) {
cpp2::ExecutionResponse resp;
std::string query = "SHOW HOSTS";
client->execute(query, resp);
std::vector<uniform_tuple_t<std::string, 3>> expected{
{"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online"},
std::vector<std::tuple<std::string, std::string, std::string,
int, std::string, std::string>> expected {
{"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online", 0, "", ""},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
Expand Down Expand Up @@ -254,7 +255,7 @@ TEST_F(SchemaTest, metaCommunication) {
// Test existent tag
{
cpp2::ExecutionResponse resp;
std::string query = "CREATE TAG person(id int, balance double)";
std::string query = "CREATE TAG person(id int)";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code);
}
Expand Down Expand Up @@ -640,6 +641,7 @@ TEST_F(SchemaTest, metaCommunication) {
client->execute(query, resp);
ASSERT_EQ(1, (*(resp.get_rows())).size());
}
sleep(FLAGS_load_data_interval_secs + 1);
}


Expand All @@ -650,8 +652,9 @@ TEST_F(SchemaTest, TTLtest) {
cpp2::ExecutionResponse resp;
std::string query = "SHOW HOSTS";
client->execute(query, resp);
std::vector<uniform_tuple_t<std::string, 3>> expected{
{"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online"},
std::vector<std::tuple<std::string, std::string, std::string,
int, std::string, std::string>> expected {
{"127.0.0.1", std::to_string(gEnv->storageServerPort()), "online", 0, "", ""},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
Expand Down
6 changes: 6 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ enum HostStatus {
struct HostItem {
1: common.HostAddr hostAddr,
2: HostStatus status,
3: map<common.GraphSpaceID, list<common.PartitionID>> (cpp.template = "std::unordered_map") leader_parts,
4: map<common.GraphSpaceID, list<common.PartitionID>> (cpp.template = "std::unordered_map") all_parts,
}

struct UserItem {
Expand Down Expand Up @@ -437,6 +439,9 @@ struct BalanceResp {
3: common.HostAddr leader,
}

struct LeaderBalanceReq {
}

enum ConfigModule {
UNKNOWN = 0x00,
ALL = 0x01,
Expand Down Expand Up @@ -540,6 +545,7 @@ service MetaService {

HBResp heartBeat(1: HBReq req);
BalanceResp balance(1: BalanceReq req);
ExecResp leaderBalance(1: LeaderBalanceReq req);

ExecResp regConfig(1: RegConfigReq req);
GetConfigResp getConfig(1: GetConfigReq req);
Expand Down
11 changes: 10 additions & 1 deletion src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,18 @@ struct AddLearnerReq {

struct CatchUpDataReq {
1: common.GraphSpaceID space_id,
2: common.GraphSpaceID part_id,
2: common.PartitionID part_id,
3: common.HostAddr target,
}

struct GetLeaderReq {
}

struct GetLeaderResp {
1: ErrorCode code,
2: map<common.GraphSpaceID, list<common.PartitionID>> (cpp.template = "std::unordered_map") leader_parts;
}


service StorageService {
QueryResponse getOutBound(1: GetNeighborsRequest req)
Expand All @@ -225,5 +233,6 @@ service StorageService {
AdminExecResp waitingForCatchUpData(1: CatchUpDataReq req);
AdminExecResp removePart(1: RemovePartReq req);
AdminExecResp memberChange(1: MemberChangeReq req);
GetLeaderResp getLeaderPart(1: GetLeaderReq req);
}

1 change: 1 addition & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class KVFilter {

using KV = std::pair<std::string, std::string>;
using KVCallback = folly::Function<void(ResultCode code)>;
using NewLeaderCallback = folly::Function<void(HostAddr nLeader)>;

inline rocksdb::Slice toSlice(const folly::StringPiece& str) {
return rocksdb::Slice(str.begin(), str.size());
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class KVEngine {
// Remove partId from current storage engine.
virtual void removePart(PartitionID partId) = 0;


// Return all partIds current storage engine holds.
virtual std::vector<PartitionID> allParts() = 0;

Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ class KVStore {

virtual ResultCode ingest(GraphSpaceID spaceId) = 0;

virtual int32_t allLeader(std::unordered_map<GraphSpaceID,
std::vector<PartitionID>>& leaderIds) = 0;

virtual ErrorOr<ResultCode, std::shared_ptr<Part>> part(GraphSpaceID spaceId,
PartitionID partId) = 0;

Expand Down
30 changes: 30 additions & 0 deletions src/kvstore/LogEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ std::string encodeLearner(const HostAddr& learner) {
// Log type
auto type = LogType::OP_ADD_LEARNER;
encoded.append(reinterpret_cast<char*>(&type), 1);
// Value length
uint32_t len = static_cast<uint32_t>(sizeof(HostAddr));
encoded.append(reinterpret_cast<char*>(&len), sizeof(len));
// Learner addr
encoded.append(reinterpret_cast<const char*>(&learner), sizeof(HostAddr));
return encoded;
}
Expand All @@ -167,6 +171,32 @@ HostAddr decodeLearner(const std::string& encoded) {
return addr;
}

std::string encodeTransLeader(const HostAddr& targetAddr) {
std::string encoded;
encoded.reserve(kHeadLen + sizeof(HostAddr));
// Timestamp (8 bytes)
int64_t ts = time::WallClock::fastNowInMilliSec();
encoded.append(reinterpret_cast<char*>(&ts), sizeof(int64_t));
// Log type
auto type = LogType::OP_TRANS_LEADER;
encoded.append(reinterpret_cast<char*>(&type), 1);
// Value length
uint32_t len = static_cast<uint32_t>(sizeof(HostAddr));
encoded.append(reinterpret_cast<char*>(&len), sizeof(len));
// Target addr
encoded.append(reinterpret_cast<const char*>(&targetAddr), sizeof(HostAddr));
return encoded;
}

HostAddr decodeTransLeader(folly::StringPiece encoded) {
HostAddr addr;
CHECK_EQ(kHeadLen + sizeof(HostAddr), encoded.size());
memcpy(&addr.first, encoded.begin() + kHeadLen, sizeof(addr.first));
memcpy(&addr.second,
encoded.begin() + kHeadLen + sizeof(addr.first),
sizeof(addr.second));
return addr;
}
} // namespace kvstore
} // namespace nebula

3 changes: 3 additions & 0 deletions src/kvstore/LogEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ enum LogType : char {
OP_REMOVE_PREFIX = 0x5,
OP_REMOVE_RANGE = 0x6,
OP_ADD_LEARNER = 0x07,
OP_TRANS_LEADER = 0x08,
};


Expand All @@ -36,6 +37,8 @@ std::vector<folly::StringPiece> decodeMultiValues(folly::StringPiece encoded);
std::string encodeLearner(const HostAddr& learner);
HostAddr decodeLearner(const std::string& encoded);

std::string encodeTransLeader(const HostAddr& targetAddr);
HostAddr decodeTransLeader(folly::StringPiece encoded);
} // namespace kvstore
} // namespace nebula
#endif // KVSTORE_LOGENCODER_H_
Loading

0 comments on commit c7c1818

Please sign in to comment.