Skip to content

Commit

Permalink
Use semifuture interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu committed Mar 3, 2022
1 parent 6354044 commit d1fd405
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
32 changes: 16 additions & 16 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::getNeighbors(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::GetNeighborsRequest& r) {
return client->future_getNeighbors(r);
return client->semifuture_getNeighbors(r);
});
}

Expand Down Expand Up @@ -142,7 +142,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::AddVerticesRequest& r) {
return client->future_addVertices(r);
return client->semifuture_addVertices(r);
});
}

Expand Down Expand Up @@ -180,8 +180,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addEdges(const CommonReq
std::move(requests),
[useToss = param.useExperimentalFeature](ThriftClientType* client,
const cpp2::AddEdgesRequest& r) {
return useToss ? client->future_chainAddEdges(r)
: client->future_addEdges(r);
return useToss ? client->semifuture_chainAddEdges(r)
: client->semifuture_addEdges(r);
});
}

Expand Down Expand Up @@ -237,7 +237,7 @@ StorageRpcRespFuture<cpp2::GetPropResponse> StorageClient::getProps(

return collectResponse(
param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) {
return client->future_getProps(r);
return client->semifuture_getProps(r);
});
}

Expand Down Expand Up @@ -270,8 +270,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteEdges(
std::move(requests),
[useToss = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return useToss ? client->future_chainDeleteEdges(r)
: client->future_deleteEdges(r);
return useToss ? client->semifuture_chainDeleteEdges(r)
: client->semifuture_deleteEdges(r);
});
}

Expand Down Expand Up @@ -303,7 +303,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteVertices(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteVerticesRequest& r) {
return client->future_deleteVertices(r);
return client->semifuture_deleteVertices(r);
});
}

Expand Down Expand Up @@ -335,7 +335,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteTags(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteTagsRequest& r) {
return client->future_deleteTags(r);
return client->semifuture_deleteTags(r);
});
}

Expand Down Expand Up @@ -523,7 +523,7 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> StorageClient::lookupIndex(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::LookupIndexRequest& r) {
return client->future_lookupIndex(r);
return client->semifuture_lookupIndex(r);
});
}

Expand Down Expand Up @@ -552,7 +552,7 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::lookupAndTravers
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::LookupAndTraverseRequest& r) {
return client->future_lookupAndTraverse(r);
return client->semifuture_lookupAndTraverse(r);
});
}

Expand Down Expand Up @@ -583,7 +583,7 @@ StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanEdge(

return collectResponse(
param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::ScanEdgeRequest& r) {
return client->future_scanEdge(r);
return client->semifuture_scanEdge(r);
});
}

Expand Down Expand Up @@ -615,7 +615,7 @@ StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanVertex(
return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::ScanVertexRequest& r) {
return client->future_scanVertex(r);
return client->semifuture_scanVertex(r);
});
}

Expand All @@ -641,7 +641,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::KVGetResponse>> StorageClient::get(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVGetRequest& r) {
return client->future_get(r);
return client->semifuture_get(r);
});
}

Expand All @@ -666,7 +666,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> StorageClient::put(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVPutRequest& r) {
return client->future_put(r);
return client->semifuture_put(r);
});
}

Expand All @@ -691,7 +691,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> StorageClient::remove(

return collectResponse(
evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVRemoveRequest& r) {
return client->future_remove(r);
return client->semifuture_remove(r);
});
}

Expand Down
32 changes: 16 additions & 16 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,37 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
folly::EventBase* evb,
std::unordered_map<HostAddr, Request> requests,
RemoteFunc&& remoteFunc) {
static_assert(folly::isSemiFuture<
typename std::result_of<RemoteFunc(ClientType*, const Request&)>::type>::value,
"Must call the semifuture_Xxx interface");
evb = DCHECK_NOTNULL(ioThreadPool_)->getEventBase();

std::vector<folly::Future<Response>> respFutures;
using RespLatencyPair = std::pair<Response, int32_t>;
std::vector<folly::Future<RespLatencyPair>> respFutures;
respFutures.reserve(requests.size());

std::vector<int32_t> totalLatencies(requests.size());
std::unordered_map<size_t, HostAddr> idxMap;
idxMap.reserve(requests.size());

for (const auto& req : requests) {
auto& host = req.first;
// Invoke the remote method
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
// Result is a pair of <Request&, bool>
idxMap.emplace(respFutures.size(), host);

auto start = time::WallClock::fastNowInMicroSec();
// Future process code will be executed on the IO thread
// Since all requests are sent using the same eventbase, all
// then-callback will be executed on the same IO thread
auto i = respFutures.size();
idxMap.emplace(i, host);
auto fut = remoteFunc(client.get(), req.second).via(evb).ensure([i, start, &totalLatencies]() {
totalLatencies[i] = time::WallClock::fastNowInMicroSec() - start;
auto fut = remoteFunc(client.get(), req.second).via(evb).thenValue([start](Response&& resp) {
return std::make_pair(std::move(resp), time::WallClock::fastNowInMicroSec() - start);
});

respFutures.emplace_back(std::move(fut));
}

return folly::collectAll(respFutures)
.deferValue([requests = std::move(requests),
totalLatencies = std::move(totalLatencies),
idxMap = std::move(idxMap),
this](std::vector<folly::Try<Response>>&& resps) {
.deferValue([requests = std::move(requests), idxMap = std::move(idxMap), this](
std::vector<folly::Try<RespLatencyPair>>&& resps) {
StorageRpcResponse<Response> rpcResp(requests.size());
DCHECK_EQ(resps.size(), requests.size());
for (size_t i = 0; i < resps.size(); i++) {
Expand All @@ -120,12 +120,12 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
}
} else {
auto resp = std::move(tryResp).value();
auto& result = resp.get_result();
auto& result = resp.first.get_result();

if (!result.get_failed_parts().empty()) {
rpcResp.markFailure();

for (auto& part : result.get_failed_parts()) {
for (const auto& part : result.get_failed_parts()) {
auto partId = part.get_part_id();
auto code = part.get_code();

Expand Down Expand Up @@ -157,9 +157,9 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(

// Adjust the latency
auto latency = result.get_latency_in_us();
rpcResp.setLatency(host, latency, totalLatencies[i]);
rpcResp.setLatency(host, latency, resp.second);
// Keep the response
rpcResp.addResponse(std::move(resp));
rpcResp.addResponse(std::move(resp.first));
}
}

Expand Down

0 comments on commit d1fd405

Please sign in to comment.