Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RPC client getResponse implementation #3977

Merged
merged 13 commits into from
Mar 23, 2022
4 changes: 2 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ jobs:
;;
esac
working-directory: tests/
timeout-minutes: 2
timeout-minutes: 4
- name: Pytest
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} test
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:
run: |
make standalone-up
working-directory: tests/
timeout-minutes: 60
timeout-minutes: 4
- name: TCK
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} standalone-tck
Expand Down
9 changes: 6 additions & 3 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq
}
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) {
return client->future_chainUpdateEdge(r);
});
Expand Down Expand Up @@ -102,7 +103,8 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) {
return client->future_chainAddEdges(r);
});
Expand Down Expand Up @@ -158,7 +160,8 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
chainReq.term_ref() = termId;
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->future_chainDeleteEdges(r);
});
Expand Down
19 changes: 5 additions & 14 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
}

std::pair<HostAddr, cpp2::UpdateVertexRequest> request;

DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(param.space);
if (!status.ok()) {
Expand All @@ -370,7 +368,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(host.status());
}
request.first = std::move(host).value();
cpp2::UpdateVertexRequest req;
req.space_id_ref() = param.space;
req.vertex_id_ref() = vertexId;
Expand All @@ -383,10 +380,10 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
if (condition.size() > 0) {
req.condition_ref() = std::move(condition);
}
request.second = std::move(req);

return getResponse(param.evb,
std::move(request),
host.value(),
req,
[](ThriftClientType* client, const cpp2::UpdateVertexRequest& r) {
return client->future_updateVertex(r);
});
Expand All @@ -405,8 +402,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
}

std::pair<HostAddr, cpp2::UpdateEdgeRequest> request;

DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(space);
if (!status.ok()) {
Expand All @@ -423,7 +418,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(host.status());
}
request.first = std::move(host).value();
cpp2::UpdateEdgeRequest req;
req.space_id_ref() = space;
req.edge_key_ref() = edgeKey;
Expand All @@ -435,10 +429,10 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
if (condition.size() > 0) {
req.condition_ref() = std::move(condition);
}
request.second = std::move(req);

return getResponse(param.evb,
std::move(request),
host.value(),
req,
[useExperimentalFeature = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::UpdateEdgeRequest& r) {
return useExperimentalFeature ? client->future_chainUpdateEdge(r)
Expand All @@ -449,7 +443,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID space,
const std::string& name,
folly::EventBase* evb) {
std::pair<HostAddr, cpp2::GetUUIDReq> request;
DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(space);
if (!status.ok()) {
Expand All @@ -466,15 +459,13 @@ folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID s
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::GetUUIDResp>>(host.status());
}
request.first = std::move(host).value();
cpp2::GetUUIDReq req;
req.space_id_ref() = space;
req.part_id_ref() = part;
req.name_ref() = name;
request.second = std::move(req);

return getResponse(
evb, std::move(request), [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
evb, host.value(), req, [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
return client->future_getUUID(r);
});
}
Expand Down
Loading