Skip to content

Commit

Permalink
Implement balance logic in meta
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Jun 28, 2019
1 parent 34eb36d commit 2a441c5
Show file tree
Hide file tree
Showing 23 changed files with 1,986 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/common/time/TimeUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef COMMON_TIME_TIMEUTILS_H_
#define COMMON_TIME_TIMEUTILS_H_

#include "base/Base.h"
#include <sys/time.h>

namespace nebula {
Expand Down
18 changes: 16 additions & 2 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum ErrorCode {
E_NOT_FOUND = -23,
E_INVALID_HOST = -24,
E_UNSUPPORTED = -25,
E_BALANCER_RUNNING = -26,

// KV Failure
E_STORE_FAILURE = -31,
Expand Down Expand Up @@ -416,6 +417,19 @@ struct CheckPasswordReq {
2: string encoded_pwd,
}

struct BalanceReq {
1: optional common.GraphSpaceID space_id,
// Specify the balance id to check the status of the related balance plan
2: optional i64 id,
}

struct BalanceResp {
1: ErrorCode code,
2: i64 id,
// Valid if code equals E_LEADER_CHANGED.
3: common.HostAddr leader,
}

service MetaService {
ExecResp createSpace(1: CreateSpaceReq req);
ExecResp dropSpace(1: DropSpaceReq req);
Expand Down Expand Up @@ -447,8 +461,6 @@ service MetaService {
ExecResp removeRange(1: RemoveRangeReq req);
ScanResp scan(1: ScanReq req);

HBResp heartBeat(1: HBReq req);

ExecResp createUser(1: CreateUserReq req);
ExecResp dropUser(1: DropUserReq req);
ExecResp alterUser(1: AlterUserReq req);
Expand All @@ -460,5 +472,7 @@ service MetaService {
ExecResp changePassword(1: ChangePasswordReq req);
ExecResp checkPassword(1: CheckPasswordReq req);

HBResp heartBeat(1: HBReq req);
BalanceResp balance(1: BalanceReq req);
}

24 changes: 24 additions & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ struct AddEdgesRequest {
3: bool overwritable,
}

struct AdminExecResp {

}

struct AddPartReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

struct RemovePartReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

struct MemberChangeReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

service StorageService {
QueryResponse getOutBound(1: GetNeighborsRequest req)
QueryResponse getInBound(1: GetNeighborsRequest req)
Expand All @@ -173,5 +192,10 @@ service StorageService {

ExecResponse addVertices(1: AddVerticesRequest req);
ExecResponse addEdges(1: AddEdgesRequest req);

// Interfaces for admin operations
AdminExecResp addPart(1: AddPartReq req);
AdminExecResp removePart(1: RemovePartReq req);
AdminExecResp memberChange(1: MemberChangeReq req);
}

7 changes: 6 additions & 1 deletion src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ add_library(
processors/customKV/ScanProcessor.cpp
processors/admin/HBProcessor.cpp
processors/usersMan/AuthenticationProcessor.cpp
processors/admin/BalanceProcessor.cpp
processors/admin/Balancer.cpp
processors/admin/BalancePlan.cpp
processors/admin/BalanceTask.cpp
processors/admin/AdminClient.cpp
)

add_dependencies(
meta_service_handler
base_obj
Expand Down Expand Up @@ -69,5 +75,4 @@ add_dependencies(
common_thrift_obj
schema_obj)


add_subdirectory(test)
6 changes: 6 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "meta/processors/customKV/RemoveRangeProcessor.h"
#include "meta/processors/admin/HBProcessor.h"
#include "meta/processors/usersMan/AuthenticationProcessor.h"
#include "meta/processors/admin/BalanceProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -251,5 +252,10 @@ MetaServiceHandler::future_checkPassword(const cpp2::CheckPasswordReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::BalanceResp>
MetaServiceHandler::future_balance(const cpp2::BalanceReq& req) {
auto* processor = BalanceProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}
} // namespace meta
} // namespace nebula
15 changes: 9 additions & 6 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListEdgesResp>
future_listEdges(const cpp2::ListEdgesReq& req) override;

/**
* HeartBeat
* */
folly::Future<cpp2::HBResp>
future_heartBeat(const cpp2::HBReq& req) override;

/**
* User manager
**/
Expand Down Expand Up @@ -140,6 +134,15 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ExecResp>
future_checkPassword(const cpp2::CheckPasswordReq& req) override;

/**
* HeartBeat
* */
folly::Future<cpp2::HBResp>
future_heartBeat(const cpp2::HBReq& req) override;

folly::Future<cpp2::BalanceResp>
future_balance(const cpp2::BalanceReq& req) override;

private:
kvstore::KVStore* kvstore_ = nullptr;
};
Expand Down
1 change: 1 addition & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const std::string kIndexTable = "__index__"; // NOLINT
const std::string kUsersTable = "__users__"; // NOLINT
const std::string kRolesTable = "__roles__"; // NOLINT


const std::string kHostOnline = "Online"; // NOLINT
const std::string kHostOffline = "Offline"; // NOLINT

Expand Down
9 changes: 9 additions & 0 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -957,5 +957,14 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, true);
}

folly::Future<StatusOr<int64_t>> MetaClient::balance() {
cpp2::BalanceReq req;
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_balance(request);
}, [] (cpp2::BalanceResp&& resp) -> int64_t {
return resp.id;
}, true);
}
} // namespace meta
} // namespace nebula
6 changes: 5 additions & 1 deletion src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ class MetaClient {
folly::Future<StatusOr<bool>>
removeRange(std::string segment, std::string start, std::string end);

// Operations for cache.
// Operations for admin
folly::Future<StatusOr<int64_t>>
balance();

// Opeartions for cache.
StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);

StatusOr<TagID> getTagIDByNameFromCache(const GraphSpaceID& space, const std::string& name);
Expand Down
22 changes: 1 addition & 21 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,13 @@
#include "meta/MetaServiceUtils.h"
#include "meta/common/MetaCommon.h"
#include "network/NetworkUtils.h"
#include "meta/processors/Common.h"

namespace nebula {
namespace meta {

using nebula::network::NetworkUtils;

const PartitionID kDefaultPartId = 0;
const GraphSpaceID kDefaultSpaceId = 0;

class LockUtils {
public:
LockUtils() = delete;
#define GENERATE_LOCK(Entry) \
static folly::SharedMutex& Entry##Lock() { \
static folly::SharedMutex l; \
return l; \
}

GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);
GENERATE_LOCK(user);

#undef GENERATE_LOCK
};

#define CHECK_SPACE_ID_AND_RETURN(spaceID) \
if (spaceExist(spaceID) == Status::SpaceNotFound()) { \
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
Expand Down
42 changes: 42 additions & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef META_PROCESSORS_COMMON_H_
#define META_PROCESSORS_COMMON_H_

#include "base/Base.h"

namespace nebula {
namespace meta {

static const PartitionID kDefaultPartId = 0;
static const GraphSpaceID kDefaultSpaceId = 0;

using BalanceID = int64_t;

class LockUtils {
public:
LockUtils() = delete;
#define GENERATE_LOCK(Entry) \
static folly::SharedMutex& Entry##Lock() { \
static folly::SharedMutex l; \
return l; \
}

GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);
GENERATE_LOCK(user);

#undef GENERATE_LOCK
};


} // namespace meta
} // namespace nebula
#endif // META_PROCESSORS_COMMON_H_

96 changes: 96 additions & 0 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(leader);
UNUSED(dst);
if (injector_) {
return injector_->transLeader();
}
return Status::OK();
}

folly::Future<Status> AdminClient::addPart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host,
bool asLearner) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(host);
UNUSED(asLearner);
if (injector_) {
return injector_->addPart();
}
return Status::OK();
}

folly::Future<Status> AdminClient::addLearner(GraphSpaceID spaceId, PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->addLearner();
}
return Status::OK();
}

folly::Future<Status> AdminClient::waitingForCatchUpData(GraphSpaceID spaceId,
PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->waitingForCatchUpData();
}
return Status::OK();
}

folly::Future<Status> AdminClient::memberChange(GraphSpaceID spaceId, PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->memberChange();
}
return Status::OK();
}

folly::Future<Status> AdminClient::updateMeta(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(leader);
UNUSED(dst);
if (injector_) {
return injector_->updateMeta();
}
return Status::OK();
}

folly::Future<Status> AdminClient::removePart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(host);
if (injector_) {
return injector_->removePart();
}
return Status::OK();
}

} // namespace meta
} // namespace nebula

Loading

0 comments on commit 2a441c5

Please sign in to comment.