Skip to content

Commit

Permalink
Add meta client async interfaces (vesoft-inc#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
dangleptr authored and dutor committed Mar 29, 2019
1 parent cf755a1 commit 0bd07ad
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 62 deletions.
98 changes: 53 additions & 45 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void MetaClient::init() {
}

void MetaClient::loadDataThreadFunc() {
auto ret = listSpaces();
auto ret = listSpaces().get();
if (!ret.ok()) {
LOG(ERROR) << "List space failed!";
return;
Expand All @@ -57,7 +57,7 @@ void MetaClient::loadDataThreadFunc() {
decltype(spaceIndexByName_) indexByName;
for (auto space : ret.value()) {
auto spaceId = space.first;
auto r = getPartsAlloc(spaceId);
auto r = getPartsAlloc(spaceId).get();
if (!r.ok()) {
LOG(ERROR) << "Get parts allocaction failed for spaceId " << spaceId;
return;
Expand Down Expand Up @@ -91,46 +91,57 @@ MetaClient::reverse(const PartsAlloc& parts) {
return hosts;
}

template<typename Request, typename RemoteFunc, typename Response>
Response MetaClient::collectResponse(Request req,
RemoteFunc remoteFunc) {
folly::Promise<Response> pro;
template<typename Request,
typename RemoteFunc,
typename RespGenerator,
typename RpcResponse,
typename Response>
folly::Future<StatusOr<Response>> MetaClient::getResponse(
Request req,
RemoteFunc remoteFunc,
RespGenerator respGen) {
folly::Promise<StatusOr<Response>> pro;
auto f = pro.getFuture();
auto* evb = ioThreadPool_->getEventBase();
auto client = clientsMan_->client(active_, evb);
remoteFunc(client, std::move(req)).then(evb, [&](folly::Try<Response>&& resp) {
pro.setValue(resp.value());
remoteFunc(client, std::move(req))
.then(evb, [p = std::move(pro), respGen, this] (folly::Try<RpcResponse>&& t) mutable {
if (t.hasException()) {
LOG(ERROR) << "Rpc failed!";
} else {
auto&& resp = t.value();
if (resp.code != cpp2::ErrorCode::SUCCEEDED) {
p.setValue(this->handleResponse(resp));
}
p.setValue(respGen(std::move(resp)));
}
});
return std::move(f).get();
return f;
}

StatusOr<GraphSpaceID>
folly::Future<StatusOr<GraphSpaceID>>
MetaClient::createSpace(std::string name, int32_t partsNum, int32_t replicaFactor) {
cpp2::CreateSpaceReq req;
req.set_space_name(std::move(name));
req.set_parts_num(partsNum);
req.set_replica_factor(replicaFactor);
auto resp = collectResponse(std::move(req), [] (auto client, auto request) {
return client->future_createSpace(request);
});
if (resp.code != cpp2::ErrorCode::SUCCEEDED) {
return handleResponse(resp);
}
return resp.get_id().get_space_id();
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_createSpace(request);
}, [] (cpp2::ExecResp&& resp) -> GraphSpaceID {
return resp.get_id().get_space_id();
});
}

StatusOr<std::vector<SpaceIdName>> MetaClient::listSpaces() {
folly::Future<StatusOr<std::vector<SpaceIdName>>> MetaClient::listSpaces() {
cpp2::ListSpacesReq req;
auto resp = collectResponse(std::move(req), [] (auto client, auto request) {
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_listSpaces(request);
}, [this] (cpp2::ListSpacesResp&& resp) -> decltype(auto) {
return this->toSpaceIdName(resp.get_spaces());
});
if (resp.code != cpp2::ErrorCode::SUCCEEDED) {
return handleResponse(resp);
}
return toSpaceIdName(resp.get_spaces());
}

Status MetaClient::addHosts(const std::vector<HostAddr>& hosts) {
folly::Future<StatusOr<bool>> MetaClient::addHosts(const std::vector<HostAddr>& hosts) {
std::vector<nebula::cpp2::HostAddr> thriftHosts;
thriftHosts.resize(hosts.size());
std::transform(hosts.begin(), hosts.end(), thriftHosts.begin(), [](const auto& h) {
Expand All @@ -141,38 +152,35 @@ Status MetaClient::addHosts(const std::vector<HostAddr>& hosts) {
});
cpp2::AddHostsReq req;
req.set_hosts(std::move(thriftHosts));
auto resp = collectResponse(std::move(req), [] (auto client, auto request) {
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_addHosts(request);
}, [] (cpp2::ExecResp&& resp) -> bool {
return resp.code == cpp2::ErrorCode::SUCCEEDED;
});
return handleResponse(resp);
}

StatusOr<std::vector<HostAddr>> MetaClient::listHosts() {
folly::Future<StatusOr<std::vector<HostAddr>>> MetaClient::listHosts() {
cpp2::ListHostsReq req;
auto resp = collectResponse(std::move(req), [] (auto client, auto request) {
return client->future_listHosts(request);
});
if (resp.code != cpp2::ErrorCode::SUCCEEDED) {
return handleResponse(resp);
}
return to(resp.hosts);
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_listHosts(request);
}, [this] (cpp2::ListHostsResp&& resp) -> decltype(auto) {
return this->to(resp.hosts);
});
}

StatusOr<std::unordered_map<PartitionID, std::vector<HostAddr>>>
folly::Future<StatusOr<std::unordered_map<PartitionID, std::vector<HostAddr>>>>
MetaClient::getPartsAlloc(GraphSpaceID spaceId) {
cpp2::GetPartsAllocReq req;
req.set_space_id(spaceId);
auto resp = collectResponse(std::move(req), [] (auto client, auto request) {
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_getPartsAlloc(request);
});
if (resp.code != cpp2::ErrorCode::SUCCEEDED) {
return handleResponse(resp);
}
std::unordered_map<PartitionID, std::vector<HostAddr>> parts;
for (auto it = resp.parts.begin(); it != resp.parts.end(); it++) {
parts.emplace(it->first, to(it->second));
}
return parts;
}, [this] (cpp2::GetPartsAllocResp&& resp) -> decltype(auto) {
std::unordered_map<PartitionID, std::vector<HostAddr>> parts;
for (auto it = resp.parts.begin(); it != resp.parts.end(); it++) {
parts.emplace(it->first, to(it->second));
}
return parts;
});
}

StatusOr<GraphSpaceID>
Expand Down
32 changes: 26 additions & 6 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,22 @@ class MetaClient {
/**
* TODO(dangleptr): Use one struct to represent space description.
* */
StatusOr<GraphSpaceID> createSpace(std::string name, int32_t partsNum, int32_t replicaFator);
folly::Future<StatusOr<GraphSpaceID>>
createSpace(std::string name, int32_t partsNum, int32_t replicaFator);

StatusOr<std::vector<SpaceIdName>> listSpaces();
folly::Future<StatusOr<std::vector<SpaceIdName>>>
listSpaces();

Status addHosts(const std::vector<HostAddr>& hosts);
folly::Future<StatusOr<bool>>
addHosts(const std::vector<HostAddr>& hosts);

StatusOr<std::vector<HostAddr>> listHosts();
folly::Future<StatusOr<std::vector<HostAddr>>>
listHosts();

StatusOr<PartsAlloc> getPartsAlloc(GraphSpaceID spaceId);
folly::Future<StatusOr<PartsAlloc>>
getPartsAlloc(GraphSpaceID spaceId);

// These are the interfaces about cache opeartions.
StatusOr<GraphSpaceID> getSpaceIdByNameFromCache(const std::string& name);

PartsMap getPartsMapFromCache(const HostAddr& host);
Expand Down Expand Up @@ -107,6 +113,21 @@ class MetaClient {
>
Response collectResponse(Request req, RemoteFunc remoteFunc);

template<class Request,
class RemoteFunc,
class RespGenerator,
class RpcResponse =
typename std::result_of<
RemoteFunc(std::shared_ptr<meta::cpp2::MetaServiceAsyncClient>, Request)
>::type::value_type,
class Response =
typename std::result_of<RespGenerator(RpcResponse)>::type
>
folly::Future<StatusOr<Response>> getResponse(
Request req,
RemoteFunc remoteFunc,
RespGenerator respGen);

std::vector<HostAddr> to(const std::vector<nebula::cpp2::HostAddr>& hosts);

std::vector<SpaceIdName> toSpaceIdName(const std::vector<cpp2::IdName>& tIdNames);
Expand All @@ -127,7 +148,6 @@ class MetaClient {
folly::RWSpinLock localCacheLock_;
MetaChangedListener* listener_{nullptr};
};

} // namespace meta
} // namespace nebula
#endif // META_METACLIENT_H_
20 changes: 11 additions & 9 deletions src/meta/test/MetaClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,28 @@ TEST(MetaClientTest, InterfacesTest) {
{
// Test addHost, listHosts interface.
std::vector<HostAddr> hosts = {{0, 0}, {1, 1}, {2, 2}, {3, 3}};
ASSERT_EQ(Status::OK(), client->addHosts(hosts));
auto ret = client->listHosts();
auto r = client->addHosts(hosts).get();
ASSERT_TRUE(r.ok());
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
}
{
// Test createSpace, listSpaces, getPartsAlloc.
{
auto ret = client->createSpace("default_space", 9, 3);
auto ret = client->createSpace("default_space", 9, 3).get();
ASSERT_TRUE(ret.ok()) << ret.status();
spaceId = ret.value();
}
{
auto ret = client->listSpaces();
auto ret = client->listSpaces().get();
ASSERT_TRUE(ret.ok()) << ret.status();
ASSERT_EQ(ret.value().size(), 1);
ASSERT_EQ(ret.value()[0].first, 1);
ASSERT_EQ(ret.value()[0].second, "default_space");
}
{
auto ret = client->getPartsAlloc(spaceId);
auto ret = client->getPartsAlloc(spaceId).get();
ASSERT_TRUE(ret.ok()) << ret.status();
for (auto it = ret.value().begin(); it != ret.value().end(); it++) {
auto startIndex = it->first;
Expand Down Expand Up @@ -144,20 +145,21 @@ TEST(MetaClientTest, DiffTest) {
{
// Test addHost, listHosts interface.
std::vector<HostAddr> hosts = {{0, 0}};
ASSERT_EQ(Status::OK(), client->addHosts(hosts));
auto ret = client->listHosts();
auto r = client->addHosts(hosts).get();
ASSERT_TRUE(r.ok());
auto ret = client->listHosts().get();
ASSERT_TRUE(ret.ok());
ASSERT_EQ(hosts, ret.value());
}
{
auto ret = client->createSpace("default_space", 9, 1);
auto ret = client->createSpace("default_space", 9, 1).get();
ASSERT_TRUE(ret.ok()) << ret.status();
}
sleep(FLAGS_load_data_interval_second + 1);
ASSERT_EQ(listener->spaceNum, 1);
ASSERT_EQ(listener->partNum, 9);
{
auto ret = client->createSpace("default_space_1", 5, 1);
auto ret = client->createSpace("default_space_1", 5, 1).get();
ASSERT_TRUE(ret.ok()) << ret.status();
}
sleep(FLAGS_load_data_interval_second + 1);
Expand Down
5 changes: 3 additions & 2 deletions src/storage/test/StorageClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ TEST(StorageClientTest, VerticesInterfacesTest) {
LOG(INFO) << "Add hosts and create space....";
{
auto mClient = std::make_unique<meta::MetaClient>();
ASSERT_EQ(Status::OK(), mClient->addHosts({HostAddr(localIp, localDataPort)}));
auto ret = mClient->createSpace("default", 10, 1);
auto r = mClient->addHosts({HostAddr(localIp, localDataPort)}).get();
ASSERT_TRUE(r.ok());
auto ret = mClient->createSpace("default", 10, 1).get();
spaceId = ret.value();
}
sleep(2 * FLAGS_load_data_interval_second + 1);
Expand Down

0 comments on commit 0bd07ad

Please sign in to comment.