From d1fd405ddbeae320470113de74be20b9a9d7c42e Mon Sep 17 00:00:00 2001 From: yixinglu <2520865+yixinglu@users.noreply.github.com> Date: Thu, 3 Mar 2022 23:18:43 +0800 Subject: [PATCH] Use semifuture interfaces --- src/clients/storage/StorageClient.cpp | 32 ++++++++++----------- src/clients/storage/StorageClientBase-inl.h | 32 ++++++++++----------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/clients/storage/StorageClient.cpp b/src/clients/storage/StorageClient.cpp index 115f869883a..acbf5bf9f19 100644 --- a/src/clients/storage/StorageClient.cpp +++ b/src/clients/storage/StorageClient.cpp @@ -103,7 +103,7 @@ StorageRpcRespFuture StorageClient::getNeighbors( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetNeighborsRequest& r) { - return client->future_getNeighbors(r); + return client->semifuture_getNeighbors(r); }); } @@ -142,7 +142,7 @@ StorageRpcRespFuture StorageClient::addVertices( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::AddVerticesRequest& r) { - return client->future_addVertices(r); + return client->semifuture_addVertices(r); }); } @@ -180,8 +180,8 @@ StorageRpcRespFuture StorageClient::addEdges(const CommonReq std::move(requests), [useToss = param.useExperimentalFeature](ThriftClientType* client, const cpp2::AddEdgesRequest& r) { - return useToss ? client->future_chainAddEdges(r) - : client->future_addEdges(r); + return useToss ? client->semifuture_chainAddEdges(r) + : client->semifuture_addEdges(r); }); } @@ -237,7 +237,7 @@ StorageRpcRespFuture StorageClient::getProps( return collectResponse( param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) { - return client->future_getProps(r); + return client->semifuture_getProps(r); }); } @@ -270,8 +270,8 @@ StorageRpcRespFuture StorageClient::deleteEdges( std::move(requests), [useToss = param.useExperimentalFeature]( ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) { - return useToss ? client->future_chainDeleteEdges(r) - : client->future_deleteEdges(r); + return useToss ? client->semifuture_chainDeleteEdges(r) + : client->semifuture_deleteEdges(r); }); } @@ -303,7 +303,7 @@ StorageRpcRespFuture StorageClient::deleteVertices( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::DeleteVerticesRequest& r) { - return client->future_deleteVertices(r); + return client->semifuture_deleteVertices(r); }); } @@ -335,7 +335,7 @@ StorageRpcRespFuture StorageClient::deleteTags( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::DeleteTagsRequest& r) { - return client->future_deleteTags(r); + return client->semifuture_deleteTags(r); }); } @@ -523,7 +523,7 @@ StorageRpcRespFuture StorageClient::lookupIndex( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::LookupIndexRequest& r) { - return client->future_lookupIndex(r); + return client->semifuture_lookupIndex(r); }); } @@ -552,7 +552,7 @@ StorageRpcRespFuture StorageClient::lookupAndTravers return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::LookupAndTraverseRequest& r) { - return client->future_lookupAndTraverse(r); + return client->semifuture_lookupAndTraverse(r); }); } @@ -583,7 +583,7 @@ StorageRpcRespFuture StorageClient::scanEdge( return collectResponse( param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::ScanEdgeRequest& r) { - return client->future_scanEdge(r); + return client->semifuture_scanEdge(r); }); } @@ -615,7 +615,7 @@ StorageRpcRespFuture StorageClient::scanVertex( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::ScanVertexRequest& r) { - return client->future_scanVertex(r); + return client->semifuture_scanVertex(r); }); } @@ -641,7 +641,7 @@ folly::SemiFuture> StorageClient::get( return collectResponse( evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVGetRequest& r) { - return client->future_get(r); + return client->semifuture_get(r); }); } @@ -666,7 +666,7 @@ folly::SemiFuture> StorageClient::put( return collectResponse( evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVPutRequest& r) { - return client->future_put(r); + return client->semifuture_put(r); }); } @@ -691,7 +691,7 @@ folly::SemiFuture> StorageClient::remove( return collectResponse( evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVRemoveRequest& r) { - return client->future_remove(r); + return client->semifuture_remove(r); }); } diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index aa7a1799b03..9ad71d10f4f 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -65,37 +65,37 @@ StorageClientBase::collectResponse( folly::EventBase* evb, std::unordered_map requests, RemoteFunc&& remoteFunc) { + static_assert(folly::isSemiFuture< + typename std::result_of::type>::value, + "Must call the semifuture_Xxx interface"); evb = DCHECK_NOTNULL(ioThreadPool_)->getEventBase(); - std::vector> respFutures; + using RespLatencyPair = std::pair; + std::vector> respFutures; respFutures.reserve(requests.size()); - std::vector totalLatencies(requests.size()); std::unordered_map idxMap; idxMap.reserve(requests.size()); for (const auto& req : requests) { auto& host = req.first; - // Invoke the remote method auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); - // Result is a pair of + idxMap.emplace(respFutures.size(), host); + auto start = time::WallClock::fastNowInMicroSec(); // Future process code will be executed on the IO thread // Since all requests are sent using the same eventbase, all // then-callback will be executed on the same IO thread - auto i = respFutures.size(); - idxMap.emplace(i, host); - auto fut = remoteFunc(client.get(), req.second).via(evb).ensure([i, start, &totalLatencies]() { - totalLatencies[i] = time::WallClock::fastNowInMicroSec() - start; + auto fut = remoteFunc(client.get(), req.second).via(evb).thenValue([start](Response&& resp) { + return std::make_pair(std::move(resp), time::WallClock::fastNowInMicroSec() - start); }); + respFutures.emplace_back(std::move(fut)); } return folly::collectAll(respFutures) - .deferValue([requests = std::move(requests), - totalLatencies = std::move(totalLatencies), - idxMap = std::move(idxMap), - this](std::vector>&& resps) { + .deferValue([requests = std::move(requests), idxMap = std::move(idxMap), this]( + std::vector>&& resps) { StorageRpcResponse rpcResp(requests.size()); DCHECK_EQ(resps.size(), requests.size()); for (size_t i = 0; i < resps.size(); i++) { @@ -120,12 +120,12 @@ StorageClientBase::collectResponse( } } else { auto resp = std::move(tryResp).value(); - auto& result = resp.get_result(); + auto& result = resp.first.get_result(); if (!result.get_failed_parts().empty()) { rpcResp.markFailure(); - for (auto& part : result.get_failed_parts()) { + for (const auto& part : result.get_failed_parts()) { auto partId = part.get_part_id(); auto code = part.get_code(); @@ -157,9 +157,9 @@ StorageClientBase::collectResponse( // Adjust the latency auto latency = result.get_latency_in_us(); - rpcResp.setLatency(host, latency, totalLatencies[i]); + rpcResp.setLatency(host, latency, resp.second); // Keep the response - rpcResp.addResponse(std::move(resp)); + rpcResp.addResponse(std::move(resp.first)); } }