Skip to content

Commit

Permalink
Implement balance logic in meta
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Jun 12, 2019
1 parent 8a3735a commit fe82c25
Show file tree
Hide file tree
Showing 23 changed files with 1,078 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/common/time/TimeUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef COMMON_TIME_TIMEUTILS_H_
#define COMMON_TIME_TIMEUTILS_H_

#include "base/Base.h"
#include <sys/time.h>

namespace nebula {
Expand Down
15 changes: 15 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum ErrorCode {
E_NOT_FOUND = -23,
E_INVALID_HOST = -24,
E_UNSUPPORTED = -25,
E_BALANCER_RUNNING = -26,

// KV Failure
E_STORE_FAILURE = -31,
Expand Down Expand Up @@ -311,6 +312,19 @@ struct HBReq {
1: common.HostAddr host,
}

struct BalanceReq {
1: optional common.GraphSpaceID space_id,
// Specify the balance id to check the status of the related balance plan
2: optional i64 id,
}

struct BalanceResp {
1: ErrorCode code,
2: i64 id,
// Valid if code equals E_LEADER_CHANGED.
3: common.HostAddr leader,
}

service MetaService {
ExecResp createSpace(1: CreateSpaceReq req);
ExecResp dropSpace(1: DropSpaceReq req);
Expand Down Expand Up @@ -343,5 +357,6 @@ service MetaService {
ScanResp scan(1: ScanReq req);

HBResp heartBeat(1: HBReq req);
BalanceResp balance(1: BalanceReq req);
}

24 changes: 24 additions & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ struct AddEdgesRequest {
3: bool overwritable,
}

struct AdminExecResp {

}

struct AddPartReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

struct RemovePartReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

struct MemberChangeReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
}

service StorageService {
QueryResponse getOutBound(1: GetNeighborsRequest req)
QueryResponse getInBound(1: GetNeighborsRequest req)
Expand All @@ -173,5 +192,10 @@ service StorageService {

ExecResponse addVertices(1: AddVerticesRequest req);
ExecResponse addEdges(1: AddEdgesRequest req);

// Interfaces for admin operations
AdminExecResp addPart(1: AddPartReq req);
AdminExecResp removePart(1: RemovePartReq req);
AdminExecResp memberChange(1: MemberChangeReq req);
}

7 changes: 6 additions & 1 deletion src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ add_library(
processors/customKV/RemoveRangeProcessor.cpp
processors/customKV/ScanProcessor.cpp
processors/admin/HBProcessor.cpp
processors/admin/BalanceProcessor.cpp
processors/admin/Balancer.cpp
processors/admin/BalancePlan.cpp
processors/admin/BalanceTask.cpp
processors/admin/AdminClient.cpp
)

add_dependencies(
meta_service_handler
base_obj
Expand Down Expand Up @@ -68,5 +74,4 @@ add_dependencies(
common_thrift_obj
schema_obj)


add_subdirectory(test)
6 changes: 6 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "meta/processors/customKV/RemoveProcessor.h"
#include "meta/processors/customKV/RemoveRangeProcessor.h"
#include "meta/processors/admin/HBProcessor.h"
#include "meta/processors/admin/BalanceProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -190,5 +191,10 @@ MetaServiceHandler::future_heartBeat(const cpp2::HBReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::BalanceResp>
MetaServiceHandler::future_balance(const cpp2::BalanceReq& req) {
auto* processor = BalanceProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}
} // namespace meta
} // namespace nebula
3 changes: 3 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::HBResp>
future_heartBeat(const cpp2::HBReq& req) override;

folly::Future<cpp2::BalanceResp>
future_balance(const cpp2::BalanceReq& req) override;

private:
kvstore::KVStore* kvstore_ = nullptr;
};
Expand Down
9 changes: 9 additions & 0 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,5 +934,14 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, true);
}

folly::Future<StatusOr<int64_t>> MetaClient::balance() {
cpp2::BalanceReq req;
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_balance(request);
}, [] (cpp2::BalanceResp&& resp) -> int64_t {
return resp.id;
}, true);
}
} // namespace meta
} // namespace nebula
4 changes: 4 additions & 0 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ class MetaClient {
folly::Future<StatusOr<bool>>
removeRange(std::string segment, std::string start, std::string end);

// Operations for admin
folly::Future<StatusOr<int64_t>>
balance();

// Opeartions for cache.
StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);

Expand Down
21 changes: 1 addition & 20 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,13 @@
#include "meta/MetaServiceUtils.h"
#include "meta/common/MetaCommon.h"
#include "network/NetworkUtils.h"
#include "meta/processors/Common.h"

namespace nebula {
namespace meta {

using nebula::network::NetworkUtils;

const PartitionID kDefaultPartId = 0;
const GraphSpaceID kDefaultSpaceId = 0;

class LockUtils {
public:
LockUtils() = delete;
#define GENERATE_LOCK(Entry) \
static folly::SharedMutex& Entry##Lock() { \
static folly::SharedMutex l; \
return l; \
}

GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);

#undef GENERATE_LOCK
};

#define CHECK_SPACE_ID_AND_RETURN(spaceID) \
if (spaceExist(spaceID) == Status::SpaceNotFound()) { \
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); \
Expand Down
39 changes: 39 additions & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef META_PROCESSORS_COMMON_H_
#define META_PROCESSORS_COMMON_H_

#include "base/Base.h"

namespace nebula {
namespace meta {

static const PartitionID kDefaultPartId = 0;
static const GraphSpaceID kDefaultSpaceId = 0;

class LockUtils {
public:
LockUtils() = delete;
#define GENERATE_LOCK(Entry) \
static folly::SharedMutex& Entry##Lock() { \
static folly::SharedMutex l; \
return l; \
}

GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);

#undef GENERATE_LOCK
};


} // namespace meta
} // namespace nebula
#endif // META_PROCESSORS_COMMON_H_

96 changes: 96 additions & 0 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(leader);
UNUSED(dst);
if (injector_) {
return injector_->transLeader();
}
return Status::OK();
}

folly::Future<Status> AdminClient::addPart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host,
bool asLearner) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(host);
UNUSED(asLearner);
if (injector_) {
return injector_->addPart();
}
return Status::OK();
}

folly::Future<Status> AdminClient::addLearner(GraphSpaceID spaceId, PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->addLearner();
}
return Status::OK();
}

folly::Future<Status> AdminClient::waitingForCatchUpData(GraphSpaceID spaceId,
PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->waitingForCatchUpData();
}
return Status::OK();
}

folly::Future<Status> AdminClient::memberChange(GraphSpaceID spaceId, PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
if (injector_) {
return injector_->memberChange();
}
return Status::OK();
}

folly::Future<Status> AdminClient::updateMeta(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(leader);
UNUSED(dst);
if (injector_) {
return injector_->updateMeta();
}
return Status::OK();
}

folly::Future<Status> AdminClient::removePart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host) {
UNUSED(spaceId);
UNUSED(partId);
UNUSED(host);
if (injector_) {
return injector_->removePart();
}
return Status::OK();
}

} // namespace meta
} // namespace nebula

69 changes: 69 additions & 0 deletions src/meta/processors/admin/AdminClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_
#define META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_

#include "base/Base.h"
#include <folly/executors/IOThreadPoolExecutor.h>
#include "base/Status.h"
#include "thrift/ThriftClientManager.h"

namespace nebula {
namespace meta {

class FaultInjector {
public:
virtual Status transLeader() = 0;
virtual Status addPart() = 0;
virtual Status addLearner() = 0;
virtual Status waitingForCatchUpData() = 0;
virtual Status memberChange() = 0;
virtual Status updateMeta() = 0;
virtual Status removePart() = 0;
};

class AdminClient {
public:
AdminClient() = default;

explicit AdminClient(std::unique_ptr<FaultInjector> injector)
: injector_(std::move(injector)) {}

~AdminClient() = default;

folly::Future<Status> transLeader(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst);

folly::Future<Status> addPart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host,
bool asLearner);

folly::Future<Status> addLearner(GraphSpaceID spaceId, PartitionID partId);

folly::Future<Status> waitingForCatchUpData(GraphSpaceID spaceId, PartitionID partId);

folly::Future<Status> memberChange(GraphSpaceID spaceId, PartitionID partId);

folly::Future<Status> updateMeta(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst);

folly::Future<Status> removePart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host);

private:
std::unique_ptr<FaultInjector> injector_{nullptr};
};
} // namespace meta
} // namespace nebula

#endif // META_PROCESSORS_ADMIN_STORAGEADMINCLIENT_H_
Loading

0 comments on commit fe82c25

Please sign in to comment.