diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index c3dd47b4a9f..e801342d42b 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -81,6 +81,7 @@ add_executable( $ $ $ + $ $ $ $ diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 486fde9c96f..60e2eaaf864 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -161,12 +161,15 @@ struct AddEdgesRequest { } struct AdminExecResp { - + 1: ErrorCode code, + // Only valid when code is E_LEADER_CHANAGED. + 2: common.HostAddr leader, } struct AddPartReq { 1: common.GraphSpaceID space_id, 2: common.PartitionID part_id, + 3: bool as_learner, } struct RemovePartReq { @@ -179,6 +182,25 @@ struct MemberChangeReq { 2: common.PartitionID part_id, } +struct TransLeaderReq { + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, + 3: common.HostAddr new_leader, +} + +struct AddLearnerReq { + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, + 3: common.HostAddr learner, +} + +struct CatchUpDataReq { + 1: common.GraphSpaceID space_id, + 2: common.GraphSpaceID part_id, + 3: common.HostAddr target, +} + + service StorageService { QueryResponse getOutBound(1: GetNeighborsRequest req) QueryResponse getInBound(1: GetNeighborsRequest req) @@ -194,7 +216,10 @@ service StorageService { ExecResponse addEdges(1: AddEdgesRequest req); // Interfaces for admin operations + AdminExecResp transLeader(1: TransLeaderReq req); AdminExecResp addPart(1: AddPartReq req); + AdminExecResp addLearner(1: AddLearnerReq req); + AdminExecResp waitingForCatchUpData(1: CatchUpDataReq req); AdminExecResp removePart(1: RemovePartReq req); AdminExecResp memberChange(1: MemberChangeReq req); } diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index cd925ec4021..fc160852503 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -49,6 +49,7 @@ add_dependencies( meta_service_handler base_obj meta_thrift_obj + storage_thrift_obj common_thrift_obj kvstore_obj thread_obj diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index 1014893b7b6..5542f07f116 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -5,90 +5,334 @@ */ #include "meta/processors/admin/AdminClient.h" +#include "meta/MetaServiceUtils.h" +#include "meta/processors/Common.h" + +DEFINE_int32(max_retry_times_admin_op, 3, "max retry times for admin request!"); namespace nebula { namespace meta { folly::Future AdminClient::transLeader(GraphSpaceID spaceId, - PartitionID partId, - const HostAddr& leader, - const HostAddr& dst) { - UNUSED(spaceId); - UNUSED(partId); - UNUSED(leader); - UNUSED(dst); + PartitionID partId, + const HostAddr& leader, + const HostAddr& dst) { if (injector_) { return injector_->transLeader(); } - return Status::OK(); + storage::cpp2::TransLeaderReq req; + req.set_space_id(spaceId); + req.set_part_id(partId); + req.set_new_leader(to(dst)); + return getResponse(leader, std::move(req), [] (auto client, auto request) { + return client->future_transLeader(request); + }, [this] (auto&& resp) -> Status { + switch (resp.get_code()) { + case storage::cpp2::ErrorCode::SUCCEEDED: + case storage::cpp2::ErrorCode::E_LEADER_CHANGED: { + return Status::OK(); + } + default: + return Status::Error("Unknown code %d", + static_cast(resp.get_code())); + } + }); } folly::Future AdminClient::addPart(GraphSpaceID spaceId, - PartitionID partId, - const HostAddr& host, - bool asLearner) { - UNUSED(spaceId); - UNUSED(partId); - UNUSED(host); - UNUSED(asLearner); + PartitionID partId, + const HostAddr& host, + bool asLearner) { if (injector_) { return injector_->addPart(); } - return Status::OK(); + storage::cpp2::AddPartReq req; + req.set_space_id(spaceId); + req.set_part_id(partId); + req.set_as_learner(asLearner); + return getResponse(host, std::move(req), [] (auto client, auto request) { + return client->future_addPart(request); + }, [this] (auto&& resp) -> Status { + if (resp.get_code() == storage::cpp2::ErrorCode::SUCCEEDED) { + return Status::OK(); + } else { + return Status::Error("Add part failed! code=%d", + static_cast(resp.get_code())); + } + }); } folly::Future AdminClient::addLearner(GraphSpaceID spaceId, PartitionID partId) { - UNUSED(spaceId); - UNUSED(partId); if (injector_) { return injector_->addLearner(); } - return Status::OK(); + storage::cpp2::AddLearnerReq req; + req.set_space_id(spaceId); + req.set_part_id(partId); + auto ret = getPeers(spaceId, partId); + if (!ret.ok()) { + return ret.status(); + } + folly::Promise pro; + auto f = pro.getFuture(); + getResponse(ret.value(), 0, std::move(req), [] (auto client, auto request) { + return client->future_addLearner(request); + }, 0, std::move(pro), FLAGS_max_retry_times_admin_op); + return f; } folly::Future AdminClient::waitingForCatchUpData(GraphSpaceID spaceId, - PartitionID partId) { - UNUSED(spaceId); - UNUSED(partId); + PartitionID partId) { if (injector_) { return injector_->waitingForCatchUpData(); } - return Status::OK(); + storage::cpp2::CatchUpDataReq req; + req.set_space_id(spaceId); + req.set_part_id(partId); + auto ret = getPeers(spaceId, partId); + if (!ret.ok()) { + return ret.status(); + } + folly::Promise pro; + auto f = pro.getFuture(); + getResponse(ret.value(), 0, std::move(req), [] (auto client, auto request) { + return client->future_waitingForCatchUpData(request); + }, 0, std::move(pro), FLAGS_max_retry_times_admin_op); + return f; } folly::Future AdminClient::memberChange(GraphSpaceID spaceId, PartitionID partId) { - UNUSED(spaceId); - UNUSED(partId); if (injector_) { return injector_->memberChange(); } - return Status::OK(); + storage::cpp2::MemberChangeReq req; + req.set_space_id(spaceId); + req.set_part_id(partId); + auto ret = getPeers(spaceId, partId); + if (!ret.ok()) { + return ret.status(); + } + folly::Promise pro; + auto f = pro.getFuture(); + getResponse(ret.value(), 0, std::move(req), [] (auto client, auto request) { + return client->future_memberChange(request); + }, 0, std::move(pro), FLAGS_max_retry_times_admin_op); + return f; } folly::Future AdminClient::updateMeta(GraphSpaceID spaceId, PartitionID partId, - const HostAddr& leader, + const HostAddr& src, const HostAddr& dst) { - UNUSED(spaceId); - UNUSED(partId); - UNUSED(leader); - UNUSED(dst); if (injector_) { return injector_->updateMeta(); } - return Status::OK(); + CHECK_NOTNULL(kv_); + auto ret = getPeers(spaceId, partId); + if (!ret.ok()) { + return ret.status(); + } + auto peers = std::move(ret).value(); + auto it = std::find(peers.begin(), peers.end(), src); + CHECK(it != peers.end()); + peers.erase(it); + peers.emplace_back(dst); + std::vector thriftPeers; + thriftPeers.resize(peers.size()); + std::transform(peers.begin(), peers.end(), thriftPeers.begin(), [this](const auto& h) { + return to(h); + }); + folly::Promise pro; + auto f = pro.getFuture(); + std::vector data; + data.emplace_back(MetaServiceUtils::partKey(spaceId, partId), + MetaServiceUtils::partVal(thriftPeers)); + kv_->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, + std::move(data), + [this, p = std::move(pro)] (kvstore::ResultCode code) mutable { + if (code == kvstore::ResultCode::SUCCEEDED) { + p.setValue(Status::OK()); + } else { + p.setValue(Status::Error("Access kv failed, code:%d", static_cast(code))); + } + }); + return f; } folly::Future AdminClient::removePart(GraphSpaceID spaceId, - PartitionID partId, - const HostAddr& host) { - UNUSED(spaceId); - UNUSED(partId); - UNUSED(host); + PartitionID partId, + const HostAddr& host) { if (injector_) { return injector_->removePart(); } - return Status::OK(); + storage::cpp2::RemovePartReq req; + req.set_space_id(spaceId); + req.set_part_id(partId); + return getResponse(host, std::move(req), [] (auto client, auto request) { + return client->future_removePart(request); + }, [this] (auto&& resp) -> Status { + if (resp.get_code() == storage::cpp2::ErrorCode::SUCCEEDED) { + return Status::OK(); + } else { + return Status::Error("Remove part failed! code=%d", + static_cast(resp.get_code())); + } + }); +} + +template +folly::Future AdminClient::getResponse( + const HostAddr& host, + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen) { + folly::Promise pro; + auto f = pro.getFuture(); + auto* evb = ioThreadPool_->getEventBase(); + auto client = clientsMan_->client(host, evb); + remoteFunc(client, std::move(req)) + .then(evb, [p = std::move(pro), + respGen, + this] (folly::Try&& t) mutable { + // exception occurred during RPC + if (t.hasException()) { + p.setValue(Status::Error(folly::stringPrintf("RPC failure in MetaClient: %s", + t.exception().what().c_str()))); + return; + } + auto&& resp = std::move(t).value(); + p.setValue(respGen(std::move(resp))); + }); + return f; +} + +template +void AdminClient::getResponse( + std::vector hosts, + int32_t index, + Request req, + RemoteFunc remoteFunc, + int32_t retry, + folly::Promise pro, + int32_t retryLimit) { + auto* evb = ioThreadPool_->getEventBase(); + CHECK_GE(index, 0); + CHECK_LT(index, hosts.size()); + auto client = clientsMan_->client(hosts[index], evb); + remoteFunc(client, req) + .then(evb, [p = std::move(pro), + hosts = std::move(hosts), + index, + req = std::move(req), + remoteFunc = std::move(remoteFunc), + retry, + retryLimit, + this] (folly::Try&& t) mutable { + // exception occurred during RPC + if (t.hasException()) { + if (retry < retryLimit) { + LOG(INFO) << "Rpc failure to " << hosts[index] + << ", retry " << retry + << ", limit " << retryLimit; + getResponse(std::move(hosts), + index + 1, + std::move(req), + remoteFunc, + retry + 1, + std::move(p), + retryLimit); + return; + } + p.setValue(Status::Error(folly::stringPrintf("RPC failure in MetaClient: %s", + t.exception().what().c_str()))); + return; + } + auto resp = std::move(t).value(); + switch (resp.get_code()) { + case storage::cpp2::ErrorCode::SUCCEEDED: { + p.setValue(Status::OK()); + return; + } + case storage::cpp2::ErrorCode::E_LEADER_CHANGED: { + if (retry < retryLimit) { + HostAddr leader(resp.get_leader().get_ip(), resp.get_leader().get_port()); + int32_t leaderIndex = 0; + for (auto& h : hosts) { + if (h == leader) { + break; + } + leaderIndex++; + } + LOG(INFO) << "Return leder change from " << hosts[index] + << ", new leader is " << leader + << ", retry " << retry + << ", limit " << retryLimit; + getResponse(std::move(hosts), + leaderIndex, + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); + return; + } + p.setValue(Status::Error("Leader changed!")); + return; + } + default: { + if (retry < retryLimit) { + LOG(INFO) << "Unknown code " << static_cast(resp.get_code()) + << " from " << hosts[index] + << ", retry " << retry + << ", limit " << retryLimit; + getResponse(std::move(hosts), + index + 1, + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); + return; + } + p.setValue(Status::Error("Unknown code %d", static_cast(resp.get_code()))); + return; + } + } + }); +} + +nebula::cpp2::HostAddr AdminClient::to(const HostAddr& addr) { + nebula::cpp2::HostAddr thriftAddr; + thriftAddr.set_ip(addr.first); + thriftAddr.set_port(addr.second); + return thriftAddr; +} + +StatusOr> AdminClient::getPeers(GraphSpaceID spaceId, PartitionID partId) { + CHECK_NOTNULL(kv_); + auto partKey = MetaServiceUtils::partKey(spaceId, partId); + std::string value; + auto code = kv_->get(kDefaultSpaceId, kDefaultPartId, partKey, &value); + switch (code) { + case kvstore::ResultCode::SUCCEEDED: { + auto partHosts = MetaServiceUtils::parsePartVal(value); + std::vector hosts; + hosts.resize(partHosts.size()); + std::transform(partHosts.begin(), partHosts.end(), hosts.begin(), [](const auto& h) { + return HostAddr(h.get_ip(), h.get_port()); + }); + return hosts; + } + case kvstore::ResultCode::ERR_KEY_NOT_FOUND: + return Status::Error("Key Not Found"); + default: + LOG(WARNING) << "Get peers failed, error " << static_cast(code); + break; + } + return Status::Error("Get Failed"); } } // namespace meta diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index cd0e3f60df4..5952212ef67 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -11,6 +11,8 @@ #include #include "base/Status.h" #include "thrift/ThriftClientManager.h" +#include "gen-cpp2/StorageServiceAsyncClient.h" +#include "kvstore/KVStore.h" namespace nebula { namespace meta { @@ -27,10 +29,21 @@ class FaultInjector { virtual folly::Future removePart() = 0; }; +static const HostAddr kRandomPeer(0, 0); + class AdminClient { + FRIEND_TEST(AdminClientTest, RetryTest); + public: AdminClient() = default; + explicit AdminClient(kvstore::KVStore* kv) + : kv_(kv) { + ioThreadPool_ = std::make_unique(10); + clientsMan_ = std::make_unique< + thrift::ThriftClientManager>(); + } + explicit AdminClient(std::unique_ptr injector) : injector_(std::move(injector)) {} @@ -39,7 +52,7 @@ class AdminClient { folly::Future transLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader, - const HostAddr& dst); + const HostAddr& dst = kRandomPeer); folly::Future addPart(GraphSpaceID spaceId, PartitionID partId, @@ -65,8 +78,36 @@ class AdminClient { return injector_.get(); } +private: + template + folly::Future getResponse(const HostAddr& host, + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen); + + template + void getResponse(std::vector hosts, + int32_t index, + Request req, + RemoteFunc remoteFunc, + int32_t retry, + folly::Promise pro, + int32_t retryLimit); + + Status handleResponse(const storage::cpp2::AdminExecResp& resp); + + nebula::cpp2::HostAddr to(const HostAddr& addr); + + StatusOr> getPeers(GraphSpaceID spaceId, PartitionID partId); + private: std::unique_ptr injector_{nullptr}; + kvstore::KVStore* kv_ = nullptr; + std::unique_ptr ioThreadPool_{nullptr}; + std::unique_ptr> + clientsMan_; }; } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp index 38a5949edac..ec593396fcf 100644 --- a/src/meta/processors/admin/BalanceTask.cpp +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -23,7 +23,7 @@ const std::string kBalanceTaskTable = "__b_task__"; // NOLINT void BalanceTask::invoke() { CHECK_NOTNULL(client_); if (ret_ == Result::FAILED) { - endTimeMs_ = time::WallClock::fastNowInSec(); + endTimeMs_ = time::WallClock::fastNowInMilliSec(); saveInStore(); onError_(); return; @@ -33,12 +33,12 @@ void BalanceTask::invoke() { LOG(INFO) << taskIdStr_ << "Start to move part!"; status_ = Status::CHANGE_LEADER; ret_ = Result::IN_PROGRESS; - startTimeMs_ = time::WallClock::fastNowInSec(); + startTimeMs_ = time::WallClock::fastNowInMilliSec(); } case Status::CHANGE_LEADER: { LOG(INFO) << taskIdStr_ << "Ask the src to give up the leadership."; SAVE_STATE(); - client_->transLeader(spaceId_, partId_, src_, dst_).thenValue([this](auto&& resp) { + client_->transLeader(spaceId_, partId_, src_).thenValue([this](auto&& resp) { if (!resp.ok()) { ret_ = Result::FAILED; } else { diff --git a/src/meta/test/AdminClientTest.cpp b/src/meta/test/AdminClientTest.cpp new file mode 100644 index 00000000000..3030eb3f0a3 --- /dev/null +++ b/src/meta/test/AdminClientTest.cpp @@ -0,0 +1,263 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "base/Base.h" +#include +#include +#include +#include "meta/processors/admin/Balancer.h" +#include "interface/gen-cpp2/StorageService.h" +#include "fs/TempDir.h" +#include "test/ServerContext.h" +#include "meta/test/TestUtils.h" + +#define RETURN_OK(req) \ + UNUSED(req); \ + do { \ + folly::Promise pro; \ + auto f = pro.getFuture(); \ + storage::cpp2::AdminExecResp resp; \ + resp.set_code(storage::cpp2::ErrorCode::SUCCEEDED); \ + pro.setValue(std::move(resp)); \ + return f; \ + } while (false) + +#define RETURN_LEADER_CHANGED(req, leader) \ + UNUSED(req); \ + do { \ + folly::Promise pro; \ + auto f = pro.getFuture(); \ + storage::cpp2::AdminExecResp resp; \ + resp.set_code(storage::cpp2::ErrorCode::E_LEADER_CHANGED); \ + resp.set_leader(leader); \ + pro.setValue(std::move(resp)); \ + return f; \ + } while (false) + +DECLARE_int32(max_retry_times_admin_op); + +namespace nebula { +namespace meta { + +class TestStorageService : public storage::cpp2::StorageServiceSvIf { +public: + folly::Future + future_transLeader(const storage::cpp2::TransLeaderReq& req) override { + RETURN_OK(req); + } + + folly::Future + future_addPart(const storage::cpp2::AddPartReq& req) override { + RETURN_OK(req); + } + + folly::Future + future_addLearner(const storage::cpp2::AddLearnerReq& req) override { + RETURN_OK(req); + } + + folly::Future + future_waitingForCatchUpData(const storage::cpp2::CatchUpDataReq& req) override { + RETURN_OK(req); + } + + folly::Future + future_removePart(const storage::cpp2::RemovePartReq& req) override { + RETURN_OK(req); + } + + folly::Future + future_memberChange(const storage::cpp2::MemberChangeReq& req) override { + RETURN_OK(req); + } +}; + +class TestStorageServiceRetry : public TestStorageService { +public: + TestStorageServiceRetry(IPv4 ip, Port port) { + leader_.set_ip(ip); + leader_.set_port(port); + } + + folly::Future + future_addLearner(const storage::cpp2::AddLearnerReq& req) override { + RETURN_LEADER_CHANGED(req, leader_); + } + + folly::Future + future_waitingForCatchUpData(const storage::cpp2::CatchUpDataReq& req) override { + RETURN_LEADER_CHANGED(req, leader_); + } + + folly::Future + future_memberChange(const storage::cpp2::MemberChangeReq& req) override { + RETURN_LEADER_CHANGED(req, leader_); + } + +private: + nebula::cpp2::HostAddr leader_; +}; + +TEST(AdminClientTest, SimpleTest) { + auto sc = std::make_unique(); + auto handler = std::make_shared(); + sc->mockCommon("storage", 0, handler); + LOG(INFO) << "Start storage server on " << sc->port_; + + IPv4 localIp; + network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); + fs::TempDir rootPath("/tmp/AdminTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + auto client = std::make_unique(kv.get()); + + { + LOG(INFO) << "Test transLeader..."; + folly::Baton baton; + client->transLeader(0, 0, {localIp, sc->port_}).then([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + } + { + LOG(INFO) << "Test addPart..."; + folly::Baton baton; + client->addPart(0, 0, {localIp, sc->port_}, true).then([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + } + { + LOG(INFO) << "Test removePart..."; + folly::Baton baton; + client->removePart(0, 0, {localIp, sc->port_}).then([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + } +} + +TEST(AdminClientTest, RetryTest) { + auto sc1 = std::make_unique(); + auto handler1 = std::make_shared(); + sc1->mockCommon("storage", 0, handler1); + LOG(INFO) << "Start storage1 server on " << sc1->port_; + + IPv4 localIp; + network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); + + auto sc2 = std::make_unique(); + auto handler2 = std::make_shared(localIp, sc1->port_); + sc2->mockCommon("storage", 0, handler2); + LOG(INFO) << "Start storage2 server on " << sc2->port_; + + + LOG(INFO) << "Now test interfaces with retry to leader!"; + fs::TempDir rootPath("/tmp/AdminTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + { + LOG(INFO) << "Write some part information!"; + std::vector thriftPeers; + // The first peer is one broken host. + thriftPeers.emplace_back(apache::thrift::FragileConstructor::FRAGILE, 0, 0); + // The second one is not leader. + thriftPeers.emplace_back(apache::thrift::FragileConstructor::FRAGILE, localIp, sc2->port_); + // The third one is healthy. + thriftPeers.emplace_back(apache::thrift::FragileConstructor::FRAGILE, localIp, sc1->port_); + + std::vector data; + data.emplace_back(MetaServiceUtils::partKey(0, 1), + MetaServiceUtils::partVal(thriftPeers)); + folly::Baton baton; + kv->asyncMultiPut(kDefaultSpaceId, + kDefaultPartId, + std::move(data), + [this, &baton] (kvstore::ResultCode code) { + CHECK_EQ(kvstore::ResultCode::SUCCEEDED, code); + baton.post(); + }); + baton.wait(); + } + + LOG(INFO) << "Now test interfaces with retry to leader!"; + auto client = std::make_unique(kv.get()); + { + LOG(INFO) << "Test transLeader, return ok if target is not leader"; + folly::Baton baton; + client->transLeader(0, 1, {localIp, sc2->port_}).then([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + } + { + LOG(INFO) << "Test member change..."; + folly::Baton baton; + client->memberChange(0, 1).then([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + } + { + LOG(INFO) << "Test add learner..."; + folly::Baton baton; + client->addLearner(0, 1).then([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + } + { + LOG(INFO) << "Test waitingForCatchUpData..."; + folly::Baton baton; + client->waitingForCatchUpData(0, 1).then([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + } + FLAGS_max_retry_times_admin_op = 1; + { + LOG(INFO) << "Test member change..."; + folly::Baton baton; + client->memberChange(0, 1).then([&baton](auto&& st) { + CHECK(!st.ok()); + CHECK_EQ("Leader changed!", st.toString()); + baton.post(); + }); + baton.wait(); + } + { + LOG(INFO) << "Test update meta..."; + folly::Baton baton; + client->updateMeta(0, 1, HostAddr(0, 0), HostAddr(1, 1)).then([&baton](auto&& st) { + CHECK(st.ok()); + baton.post(); + }); + baton.wait(); + auto peersRet = client->getPeers(0, 1); + CHECK(peersRet.ok()); + auto hosts = std::move(peersRet).value(); + ASSERT_EQ(3, hosts.size()); + ASSERT_EQ(HostAddr(localIp, sc2->port_), hosts[0]); + ASSERT_EQ(HostAddr(localIp, sc1->port_), hosts[1]); + ASSERT_EQ(HostAddr(1, 1), hosts[2]); + } +} + +} // namespace meta +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp new file mode 100644 index 00000000000..251bff8d470 --- /dev/null +++ b/src/meta/test/BalanceIntegrationTest.cpp @@ -0,0 +1,35 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "base/Base.h" +#include +#include +#include +#include "meta/processors/admin/Balancer.h" +#include "meta/test/TestUtils.h" +#include "storage/test/TestUtils.h" +#include "fs/TempDir.h" + +namespace nebula { +namespace meta { + +TEST(BalanceIntegrationTest, SimpleTest) { + auto sc = std::make_unique(); + auto handler = std::make_shared(nullptr, nullptr); + sc->mockCommon("storage", 0, handler); + LOG(INFO) << "Start storage server on " << sc->port_; +} + +} // namespace meta +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 440320e14bf..bf3e9d4c42e 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -25,6 +25,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -56,6 +57,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -84,6 +86,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -117,6 +120,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -143,6 +147,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -179,6 +184,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -215,6 +221,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -236,6 +243,68 @@ nebula_link_libraries( ) nebula_add_test(balancer_test) +add_executable( + balance_integration_test + BalanceIntegrationTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + balance_integration_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + gtest +) +nebula_add_test(balance_integration_test) + +add_executable( + admin_client_test + AdminClientTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + admin_client_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + gtest +) +nebula_add_test(admin_client_test) + add_executable( authentication_test AuthProcessorTest.cpp @@ -243,6 +312,7 @@ add_executable( $ $ $ + $ $ $ $ diff --git a/src/storage/AdminProcessor.h b/src/storage/AdminProcessor.h new file mode 100644 index 00000000000..f40f7d39d63 --- /dev/null +++ b/src/storage/AdminProcessor.h @@ -0,0 +1,108 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef STORAGE_TRANSLEADERPROCESSOR_H_ +#define STORAGE_TRANSLEADERPROCESSOR_H_ + +#include "base/Base.h" +#include "storage/BaseProcessor.h" + +namespace nebula { +namespace storage { + +class TransLeaderProcessor : public BaseProcessor { +public: + static TransLeaderProcessor* instance(kvstore::KVStore* kvstore) { + return new TransLeaderProcessor(kvstore); + } + + void process(const cpp2::TransLeaderReq& req) { + UNUSED(req); + } + +private: + explicit TransLeaderProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore, nullptr) {} +}; + +class AddPartProcessor : public BaseProcessor { +public: + static AddPartProcessor* instance(kvstore::KVStore* kvstore) { + return new AddPartProcessor(kvstore); + } + + void process(const cpp2::AddPartReq& req) { + UNUSED(req); + } + +private: + explicit AddPartProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore, nullptr) {} +}; + +class RemovePartProcessor : public BaseProcessor { +public: + static RemovePartProcessor* instance(kvstore::KVStore* kvstore) { + return new RemovePartProcessor(kvstore); + } + + void process(const cpp2::RemovePartReq& req) { + UNUSED(req); + } + +private: + explicit RemovePartProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore, nullptr) {} +}; + +class MemberChangeProcessor : public BaseProcessor { +public: + static MemberChangeProcessor* instance(kvstore::KVStore* kvstore) { + return new MemberChangeProcessor(kvstore); + } + + void process(const cpp2::MemberChangeReq& req) { + UNUSED(req); + } + +private: + explicit MemberChangeProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore, nullptr) {} +}; + +class AddLearnerProcessor : public BaseProcessor { +public: + static AddLearnerProcessor* instance(kvstore::KVStore* kvstore) { + return new AddLearnerProcessor(kvstore); + } + + void process(const cpp2::AddLearnerReq& req) { + UNUSED(req); + } + +private: + explicit AddLearnerProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore, nullptr) {} +}; + +class WaitingForCatchUpDataProcessor : public BaseProcessor { +public: + static WaitingForCatchUpDataProcessor* instance(kvstore::KVStore* kvstore) { + return new WaitingForCatchUpDataProcessor(kvstore); + } + + void process(const cpp2::CatchUpDataReq& req) { + UNUSED(req); + } + +private: + explicit WaitingForCatchUpDataProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore, nullptr) {} +}; +} // namespace storage +} // namespace nebula + +#endif // STORAGE_TRANSLEADERPROCESSOR_H_ diff --git a/src/storage/StorageServiceHandler.cpp b/src/storage/StorageServiceHandler.cpp index d0bc9ff0002..8236704f318 100644 --- a/src/storage/StorageServiceHandler.cpp +++ b/src/storage/StorageServiceHandler.cpp @@ -12,6 +12,7 @@ #include "storage/QueryVertexPropsProcessor.h" #include "storage/QueryEdgePropsProcessor.h" #include "storage/QueryStatsProcessor.h" +#include "storage/AdminProcessor.h" #define RETURN_FUTURE(processor) \ auto f = processor->getFuture(); \ @@ -76,6 +77,43 @@ StorageServiceHandler::future_addEdges(const cpp2::AddEdgesRequest& req) { auto* processor = AddEdgesProcessor::instance(kvstore_, schemaMan_); RETURN_FUTURE(processor); } + +folly::Future +StorageServiceHandler::future_transLeader(const cpp2::TransLeaderReq& req) { + auto* processor = TransLeaderProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +StorageServiceHandler::future_addPart(const cpp2::AddPartReq& req) { + auto* processor = AddPartProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +StorageServiceHandler::future_addLearner(const cpp2::AddLearnerReq& req) { + auto* processor = AddLearnerProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +StorageServiceHandler::future_waitingForCatchUpData(const cpp2::CatchUpDataReq& req) { + auto* processor = WaitingForCatchUpDataProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +StorageServiceHandler::future_removePart(const cpp2::RemovePartReq& req) { + auto* processor = RemovePartProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +StorageServiceHandler::future_memberChange(const cpp2::MemberChangeReq& req) { + auto* processor = MemberChangeProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + } // namespace storage } // namespace nebula diff --git a/src/storage/StorageServiceHandler.h b/src/storage/StorageServiceHandler.h index b0bdc9c0c3d..1469b243a69 100644 --- a/src/storage/StorageServiceHandler.h +++ b/src/storage/StorageServiceHandler.h @@ -49,6 +49,25 @@ class StorageServiceHandler final : public cpp2::StorageServiceSvIf { folly::Future future_addEdges(const cpp2::AddEdgesRequest& req) override; + // Admin operations + folly::Future + future_transLeader(const cpp2::TransLeaderReq& req) override; + + folly::Future + future_addPart(const cpp2::AddPartReq& req) override; + + folly::Future + future_addLearner(const cpp2::AddLearnerReq& req) override; + + folly::Future + future_waitingForCatchUpData(const cpp2::CatchUpDataReq& req) override; + + folly::Future + future_removePart(const cpp2::RemovePartReq& req) override; + + folly::Future + future_memberChange(const cpp2::MemberChangeReq& req) override; + private: kvstore::KVStore* kvstore_ = nullptr; meta::SchemaManager* schemaMan_;