Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Balance skeleton on meta. #516

Merged
merged 6 commits into from
Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum ErrorCode {
E_INVALID_HOST = -24,
E_UNSUPPORTED = -25,
E_NOT_DROP = -26,
E_BALANCER_RUNNING = -27,

// KV Failure
E_STORE_FAILURE = -31,
Expand Down Expand Up @@ -417,6 +418,19 @@ struct CheckPasswordReq {
2: string encoded_pwd,
}

struct BalanceReq {
1: optional common.GraphSpaceID space_id,
// Specify the balance id to check the status of the related balance plan
2: optional i64 id,
}

struct BalanceResp {
1: ErrorCode code,
2: i64 id,
// Valid if code equals E_LEADER_CHANGED.
3: common.HostAddr leader,
dangleptr marked this conversation as resolved.
Show resolved Hide resolved
}

service MetaService {
ExecResp createSpace(1: CreateSpaceReq req);
ExecResp dropSpace(1: DropSpaceReq req);
Expand Down Expand Up @@ -448,8 +462,6 @@ service MetaService {
ExecResp removeRange(1: RemoveRangeReq req);
ScanResp scan(1: ScanReq req);

HBResp heartBeat(1: HBReq req);

ExecResp createUser(1: CreateUserReq req);
ExecResp dropUser(1: DropUserReq req);
ExecResp alterUser(1: AlterUserReq req);
Expand All @@ -461,5 +473,7 @@ service MetaService {
ExecResp changePassword(1: ChangePasswordReq req);
ExecResp checkPassword(1: CheckPasswordReq req);

HBResp heartBeat(1: HBReq req);
BalanceResp balance(1: BalanceReq req);
}

24 changes: 24 additions & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ struct AddEdgesRequest {
3: bool overwritable,
}

struct AdminExecResp {

}

struct AddPartReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

struct RemovePartReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

struct MemberChangeReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

service StorageService {
QueryResponse getOutBound(1: GetNeighborsRequest req)
QueryResponse getInBound(1: GetNeighborsRequest req)
Expand All @@ -173,5 +192,10 @@ service StorageService {

ExecResponse addVertices(1: AddVerticesRequest req);
ExecResponse addEdges(1: AddEdgesRequest req);

// Interfaces for admin operations
AdminExecResp addPart(1: AddPartReq req);
AdminExecResp removePart(1: RemovePartReq req);
AdminExecResp memberChange(1: MemberChangeReq req);
}

2 changes: 1 addition & 1 deletion src/kvstore/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,5 +171,5 @@ nebula_link_libraries(
boost_regex
)

nebula_add_test(multi_versions_perf_test_bm)
# nebula_add_test(multi_versions_perf_test_bm)

7 changes: 6 additions & 1 deletion src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ add_library(
processors/customKV/ScanProcessor.cpp
processors/admin/HBProcessor.cpp
processors/usersMan/AuthenticationProcessor.cpp
processors/admin/BalanceProcessor.cpp
processors/admin/Balancer.cpp
processors/admin/BalancePlan.cpp
processors/admin/BalanceTask.cpp
processors/admin/AdminClient.cpp
)

add_dependencies(
meta_service_handler
base_obj
Expand Down Expand Up @@ -69,5 +75,4 @@ add_dependencies(
common_thrift_obj
schema_obj)


add_subdirectory(test)
6 changes: 6 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "meta/processors/customKV/RemoveRangeProcessor.h"
#include "meta/processors/admin/HBProcessor.h"
#include "meta/processors/usersMan/AuthenticationProcessor.h"
#include "meta/processors/admin/BalanceProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -251,5 +252,10 @@ MetaServiceHandler::future_checkPassword(const cpp2::CheckPasswordReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::BalanceResp>
MetaServiceHandler::future_balance(const cpp2::BalanceReq& req) {
auto* processor = BalanceProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}
} // namespace meta
} // namespace nebula
15 changes: 9 additions & 6 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListEdgesResp>
future_listEdges(const cpp2::ListEdgesReq& req) override;

/**
* HeartBeat
* */
folly::Future<cpp2::HBResp>
future_heartBeat(const cpp2::HBReq& req) override;

/**
* User manager
**/
Expand Down Expand Up @@ -140,6 +134,15 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ExecResp>
future_checkPassword(const cpp2::CheckPasswordReq& req) override;

/**
* HeartBeat
* */
folly::Future<cpp2::HBResp>
future_heartBeat(const cpp2::HBReq& req) override;

folly::Future<cpp2::BalanceResp>
future_balance(const cpp2::BalanceReq& req) override;

private:
kvstore::KVStore* kvstore_ = nullptr;
};
Expand Down
1 change: 1 addition & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const std::string kIndexTable = "__index__"; // NOLINT
const std::string kUsersTable = "__users__"; // NOLINT
const std::string kRolesTable = "__roles__"; // NOLINT


const std::string kHostOnline = "Online"; // NOLINT
const std::string kHostOffline = "Offline"; // NOLINT

Expand Down
9 changes: 9 additions & 0 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -966,5 +966,14 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, true);
}

folly::Future<StatusOr<int64_t>> MetaClient::balance() {
cpp2::BalanceReq req;
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_balance(request);
}, [] (cpp2::BalanceResp&& resp) -> int64_t {
return resp.id;
}, true);
}
} // namespace meta
} // namespace nebula
6 changes: 5 additions & 1 deletion src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ class MetaClient {
folly::Future<StatusOr<bool>>
removeRange(std::string segment, std::string start, std::string end);

// Operations for cache.
// Operations for admin
folly::Future<StatusOr<int64_t>>
balance();

// Opeartions for cache.
StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);

StatusOr<TagID> getTagIDByNameFromCache(const GraphSpaceID& space, const std::string& name);
Expand Down
22 changes: 1 addition & 21 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,13 @@
#include "meta/MetaServiceUtils.h"
#include "meta/common/MetaCommon.h"
#include "network/NetworkUtils.h"
#include "meta/processors/Common.h"

namespace nebula {
namespace meta {

using nebula::network::NetworkUtils;

const PartitionID kDefaultPartId = 0;
const GraphSpaceID kDefaultSpaceId = 0;

class LockUtils {
public:
LockUtils() = delete;
#define GENERATE_LOCK(Entry) \
static folly::SharedMutex& Entry##Lock() { \
static folly::SharedMutex l; \
return l; \
}

GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);
GENERATE_LOCK(user);

#undef GENERATE_LOCK
};

#define CHECK_SPACE_ID_AND_RETURN(spaceID) \
if (spaceExist(spaceID) == Status::SpaceNotFound()) { \
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
Expand Down
42 changes: 42 additions & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef META_PROCESSORS_COMMON_H_
#define META_PROCESSORS_COMMON_H_

#include "base/Base.h"

namespace nebula {
namespace meta {

static const PartitionID kDefaultPartId = 0;
static const GraphSpaceID kDefaultSpaceId = 0;

using BalanceID = int64_t;

class LockUtils {
public:
LockUtils() = delete;
#define GENERATE_LOCK(Entry) \
static folly::SharedMutex& Entry##Lock() { \
static folly::SharedMutex l; \
return l; \
}

GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);
GENERATE_LOCK(user);

#undef GENERATE_LOCK
};


} // namespace meta
} // namespace nebula
#endif // META_PROCESSORS_COMMON_H_

96 changes: 96 additions & 0 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(leader);
UNUSED(dst);
if (injector_) {
return injector_->transLeader();
}
return Status::OK();
}

folly::Future<Status> AdminClient::addPart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host,
bool asLearner) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(host);
UNUSED(asLearner);
if (injector_) {
return injector_->addPart();
}
return Status::OK();
}

folly::Future<Status> AdminClient::addLearner(GraphSpaceID spaceId, PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->addLearner();
}
return Status::OK();
}

folly::Future<Status> AdminClient::waitingForCatchUpData(GraphSpaceID spaceId,
PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->waitingForCatchUpData();
}
return Status::OK();
}

folly::Future<Status> AdminClient::memberChange(GraphSpaceID spaceId, PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->memberChange();
}
return Status::OK();
}

folly::Future<Status> AdminClient::updateMeta(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(leader);
UNUSED(dst);
if (injector_) {
return injector_->updateMeta();
}
return Status::OK();
}

folly::Future<Status> AdminClient::removePart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(host);
if (injector_) {
return injector_->removePart();
}
return Status::OK();
}

} // namespace meta
} // namespace nebula

Loading