Skip to content

Commit

Permalink
Refactor RPC client getResponse implementation (#3977)
Browse files Browse the repository at this point in the history
* Refactor storage client

* Fix RPC TIMEOUT error caused by thrift client

* Cleanup

* Fix request usage by reference

* Minor improvement

* Replace result_of with invoke_result

* Cleanup

* Wait more times
  • Loading branch information
yixinglu authored Mar 23, 2022
1 parent dd270fb commit bdc95fe
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 255 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ jobs:
;;
esac
working-directory: tests/
timeout-minutes: 2
timeout-minutes: 4
- name: Pytest
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} test
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:
run: |
make standalone-up
working-directory: tests/
timeout-minutes: 60
timeout-minutes: 4
- name: TCK
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} standalone-tck
Expand Down
9 changes: 6 additions & 3 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq
}
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) {
return client->future_chainUpdateEdge(r);
});
Expand Down Expand Up @@ -102,7 +103,8 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) {
return client->future_chainAddEdges(r);
});
Expand Down Expand Up @@ -158,7 +160,8 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
chainReq.term_ref() = termId;
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
leader,
chainReq,
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->future_chainDeleteEdges(r);
});
Expand Down
19 changes: 5 additions & 14 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
}

std::pair<HostAddr, cpp2::UpdateVertexRequest> request;

DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(param.space);
if (!status.ok()) {
Expand All @@ -370,7 +368,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(host.status());
}
request.first = std::move(host).value();
cpp2::UpdateVertexRequest req;
req.space_id_ref() = param.space;
req.vertex_id_ref() = vertexId;
Expand All @@ -383,10 +380,10 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateVert
if (condition.size() > 0) {
req.condition_ref() = std::move(condition);
}
request.second = std::move(req);

return getResponse(param.evb,
std::move(request),
host.value(),
req,
[](ThriftClientType* client, const cpp2::UpdateVertexRequest& r) {
return client->future_updateVertex(r);
});
Expand All @@ -405,8 +402,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
}

std::pair<HostAddr, cpp2::UpdateEdgeRequest> request;

DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(space);
if (!status.ok()) {
Expand All @@ -423,7 +418,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(host.status());
}
request.first = std::move(host).value();
cpp2::UpdateEdgeRequest req;
req.space_id_ref() = space;
req.edge_key_ref() = edgeKey;
Expand All @@ -435,10 +429,10 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
if (condition.size() > 0) {
req.condition_ref() = std::move(condition);
}
request.second = std::move(req);

return getResponse(param.evb,
std::move(request),
host.value(),
req,
[useExperimentalFeature = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::UpdateEdgeRequest& r) {
return useExperimentalFeature ? client->future_chainUpdateEdge(r)
Expand All @@ -449,7 +443,6 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> StorageClient::updateEdge
folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID space,
const std::string& name,
folly::EventBase* evb) {
std::pair<HostAddr, cpp2::GetUUIDReq> request;
DCHECK(!!metaClient_);
auto status = metaClient_->partsNum(space);
if (!status.ok()) {
Expand All @@ -466,15 +459,13 @@ folly::Future<StatusOr<cpp2::GetUUIDResp>> StorageClient::getUUID(GraphSpaceID s
if (!host.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::GetUUIDResp>>(host.status());
}
request.first = std::move(host).value();
cpp2::GetUUIDReq req;
req.space_id_ref() = space;
req.part_id_ref() = part;
req.name_ref() = name;
request.second = std::move(req);

return getResponse(
evb, std::move(request), [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
evb, host.value(), req, [](ThriftClientType* client, const cpp2::GetUUIDReq& r) {
return client->future_getUUID(r);
});
}
Expand Down
Loading

0 comments on commit bdc95fe

Please sign in to comment.