From 0bd07ad3c01c18808a9a4e814d1cd8100b0112f8 Mon Sep 17 00:00:00 2001 From: dangleptr <37216992+dangleptr@users.noreply.github.com> Date: Fri, 29 Mar 2019 10:18:40 +0800 Subject: [PATCH] Add meta client async interfaces (#238) --- src/meta/client/MetaClient.cpp | 98 ++++++++++++++------------ src/meta/client/MetaClient.h | 32 +++++++-- src/meta/test/MetaClientTest.cpp | 20 +++--- src/storage/test/StorageClientTest.cpp | 5 +- 4 files changed, 93 insertions(+), 62 deletions(-) diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index ae1a2dbb604..0334b7f09f5 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -48,7 +48,7 @@ void MetaClient::init() { } void MetaClient::loadDataThreadFunc() { - auto ret = listSpaces(); + auto ret = listSpaces().get(); if (!ret.ok()) { LOG(ERROR) << "List space failed!"; return; @@ -57,7 +57,7 @@ void MetaClient::loadDataThreadFunc() { decltype(spaceIndexByName_) indexByName; for (auto space : ret.value()) { auto spaceId = space.first; - auto r = getPartsAlloc(spaceId); + auto r = getPartsAlloc(spaceId).get(); if (!r.ok()) { LOG(ERROR) << "Get parts allocaction failed for spaceId " << spaceId; return; @@ -91,46 +91,57 @@ MetaClient::reverse(const PartsAlloc& parts) { return hosts; } -template -Response MetaClient::collectResponse(Request req, - RemoteFunc remoteFunc) { - folly::Promise pro; +template +folly::Future> MetaClient::getResponse( + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen) { + folly::Promise> pro; auto f = pro.getFuture(); auto* evb = ioThreadPool_->getEventBase(); auto client = clientsMan_->client(active_, evb); - remoteFunc(client, std::move(req)).then(evb, [&](folly::Try&& resp) { - pro.setValue(resp.value()); + remoteFunc(client, std::move(req)) + .then(evb, [p = std::move(pro), respGen, this] (folly::Try&& t) mutable { + if (t.hasException()) { + LOG(ERROR) << "Rpc failed!"; + } else { + auto&& resp = t.value(); + if (resp.code != cpp2::ErrorCode::SUCCEEDED) { + p.setValue(this->handleResponse(resp)); + } + p.setValue(respGen(std::move(resp))); + } }); - return std::move(f).get(); + return f; } -StatusOr +folly::Future> MetaClient::createSpace(std::string name, int32_t partsNum, int32_t replicaFactor) { cpp2::CreateSpaceReq req; req.set_space_name(std::move(name)); req.set_parts_num(partsNum); req.set_replica_factor(replicaFactor); - auto resp = collectResponse(std::move(req), [] (auto client, auto request) { - return client->future_createSpace(request); - }); - if (resp.code != cpp2::ErrorCode::SUCCEEDED) { - return handleResponse(resp); - } - return resp.get_id().get_space_id(); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_createSpace(request); + }, [] (cpp2::ExecResp&& resp) -> GraphSpaceID { + return resp.get_id().get_space_id(); + }); } -StatusOr> MetaClient::listSpaces() { +folly::Future>> MetaClient::listSpaces() { cpp2::ListSpacesReq req; - auto resp = collectResponse(std::move(req), [] (auto client, auto request) { + return getResponse(std::move(req), [] (auto client, auto request) { return client->future_listSpaces(request); + }, [this] (cpp2::ListSpacesResp&& resp) -> decltype(auto) { + return this->toSpaceIdName(resp.get_spaces()); }); - if (resp.code != cpp2::ErrorCode::SUCCEEDED) { - return handleResponse(resp); - } - return toSpaceIdName(resp.get_spaces()); } -Status MetaClient::addHosts(const std::vector& hosts) { +folly::Future> MetaClient::addHosts(const std::vector& hosts) { std::vector thriftHosts; thriftHosts.resize(hosts.size()); std::transform(hosts.begin(), hosts.end(), thriftHosts.begin(), [](const auto& h) { @@ -141,38 +152,35 @@ Status MetaClient::addHosts(const std::vector& hosts) { }); cpp2::AddHostsReq req; req.set_hosts(std::move(thriftHosts)); - auto resp = collectResponse(std::move(req), [] (auto client, auto request) { + return getResponse(std::move(req), [] (auto client, auto request) { return client->future_addHosts(request); + }, [] (cpp2::ExecResp&& resp) -> bool { + return resp.code == cpp2::ErrorCode::SUCCEEDED; }); - return handleResponse(resp); } -StatusOr> MetaClient::listHosts() { +folly::Future>> MetaClient::listHosts() { cpp2::ListHostsReq req; - auto resp = collectResponse(std::move(req), [] (auto client, auto request) { - return client->future_listHosts(request); - }); - if (resp.code != cpp2::ErrorCode::SUCCEEDED) { - return handleResponse(resp); - } - return to(resp.hosts); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_listHosts(request); + }, [this] (cpp2::ListHostsResp&& resp) -> decltype(auto) { + return this->to(resp.hosts); + }); } -StatusOr>> +folly::Future>>> MetaClient::getPartsAlloc(GraphSpaceID spaceId) { cpp2::GetPartsAllocReq req; req.set_space_id(spaceId); - auto resp = collectResponse(std::move(req), [] (auto client, auto request) { + return getResponse(std::move(req), [] (auto client, auto request) { return client->future_getPartsAlloc(request); - }); - if (resp.code != cpp2::ErrorCode::SUCCEEDED) { - return handleResponse(resp); - } - std::unordered_map> parts; - for (auto it = resp.parts.begin(); it != resp.parts.end(); it++) { - parts.emplace(it->first, to(it->second)); - } - return parts; + }, [this] (cpp2::GetPartsAllocResp&& resp) -> decltype(auto) { + std::unordered_map> parts; + for (auto it = resp.parts.begin(); it != resp.parts.end(); it++) { + parts.emplace(it->first, to(it->second)); + } + return parts; + }); } StatusOr diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index be81dd7273b..61ff426f376 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -56,16 +56,22 @@ class MetaClient { /** * TODO(dangleptr): Use one struct to represent space description. * */ - StatusOr createSpace(std::string name, int32_t partsNum, int32_t replicaFator); + folly::Future> + createSpace(std::string name, int32_t partsNum, int32_t replicaFator); - StatusOr> listSpaces(); + folly::Future>> + listSpaces(); - Status addHosts(const std::vector& hosts); + folly::Future> + addHosts(const std::vector& hosts); - StatusOr> listHosts(); + folly::Future>> + listHosts(); - StatusOr getPartsAlloc(GraphSpaceID spaceId); + folly::Future> + getPartsAlloc(GraphSpaceID spaceId); + // These are the interfaces about cache opeartions. StatusOr getSpaceIdByNameFromCache(const std::string& name); PartsMap getPartsMapFromCache(const HostAddr& host); @@ -107,6 +113,21 @@ class MetaClient { > Response collectResponse(Request req, RemoteFunc remoteFunc); + template, Request) + >::type::value_type, + class Response = + typename std::result_of::type + > + folly::Future> getResponse( + Request req, + RemoteFunc remoteFunc, + RespGenerator respGen); + std::vector to(const std::vector& hosts); std::vector toSpaceIdName(const std::vector& tIdNames); @@ -127,7 +148,6 @@ class MetaClient { folly::RWSpinLock localCacheLock_; MetaChangedListener* listener_{nullptr}; }; - } // namespace meta } // namespace nebula #endif // META_METACLIENT_H_ diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index d0c8c301b35..f0df1675a78 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -32,27 +32,28 @@ TEST(MetaClientTest, InterfacesTest) { { // Test addHost, listHosts interface. std::vector hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}}; - ASSERT_EQ(Status::OK(), client->addHosts(hosts)); - auto ret = client->listHosts(); + auto r = client->addHosts(hosts).get(); + ASSERT_TRUE(r.ok()); + auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); ASSERT_EQ(hosts, ret.value()); } { // Test createSpace, listSpaces, getPartsAlloc. { - auto ret = client->createSpace("default_space", 9, 3); + auto ret = client->createSpace("default_space", 9, 3).get(); ASSERT_TRUE(ret.ok()) << ret.status(); spaceId = ret.value(); } { - auto ret = client->listSpaces(); + auto ret = client->listSpaces().get(); ASSERT_TRUE(ret.ok()) << ret.status(); ASSERT_EQ(ret.value().size(), 1); ASSERT_EQ(ret.value()[0].first, 1); ASSERT_EQ(ret.value()[0].second, "default_space"); } { - auto ret = client->getPartsAlloc(spaceId); + auto ret = client->getPartsAlloc(spaceId).get(); ASSERT_TRUE(ret.ok()) << ret.status(); for (auto it = ret.value().begin(); it != ret.value().end(); it++) { auto startIndex = it->first; @@ -144,20 +145,21 @@ TEST(MetaClientTest, DiffTest) { { // Test addHost, listHosts interface. std::vector hosts = {{0, 0}}; - ASSERT_EQ(Status::OK(), client->addHosts(hosts)); - auto ret = client->listHosts(); + auto r = client->addHosts(hosts).get(); + ASSERT_TRUE(r.ok()); + auto ret = client->listHosts().get(); ASSERT_TRUE(ret.ok()); ASSERT_EQ(hosts, ret.value()); } { - auto ret = client->createSpace("default_space", 9, 1); + auto ret = client->createSpace("default_space", 9, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } sleep(FLAGS_load_data_interval_second + 1); ASSERT_EQ(listener->spaceNum, 1); ASSERT_EQ(listener->partNum, 9); { - auto ret = client->createSpace("default_space_1", 5, 1); + auto ret = client->createSpace("default_space_1", 5, 1).get(); ASSERT_TRUE(ret.ok()) << ret.status(); } sleep(FLAGS_load_data_interval_second + 1); diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp index 53265a0ad4e..7b13439a8d3 100644 --- a/src/storage/test/StorageClientTest.cpp +++ b/src/storage/test/StorageClientTest.cpp @@ -41,8 +41,9 @@ TEST(StorageClientTest, VerticesInterfacesTest) { LOG(INFO) << "Add hosts and create space...."; { auto mClient = std::make_unique(); - ASSERT_EQ(Status::OK(), mClient->addHosts({HostAddr(localIp, localDataPort)})); - auto ret = mClient->createSpace("default", 10, 1); + auto r = mClient->addHosts({HostAddr(localIp, localDataPort)}).get(); + ASSERT_TRUE(r.ok()); + auto ret = mClient->createSpace("default", 10, 1).get(); spaceId = ret.value(); } sleep(2 * FLAGS_load_data_interval_second + 1);