From bdc95feaa648cb310268b52461150d56fe29e923 Mon Sep 17 00:00:00 2001 From: Yee <2520865+yixinglu@users.noreply.github.com> Date: Wed, 23 Mar 2022 19:52:34 +0800 Subject: [PATCH] Refactor RPC client getResponse implementation (#3977) * Refactor storage client * Fix RPC TIMEOUT error caused by thrift client * Cleanup * Fix request usage by reference * Minor improvement * Replace result_of with invoke_result * Cleanup * Wait more times --- .github/workflows/pull_request.yml | 4 +- src/clients/storage/InternalStorageClient.cpp | 9 +- src/clients/storage/StorageClient.cpp | 19 +- src/clients/storage/StorageClientBase-inl.h | 341 +++++++----------- src/clients/storage/StorageClientBase.h | 13 +- src/common/base/StatusOr.h | 17 + tests/Makefile | 4 +- tests/tck/conftest.py | 12 +- tests/tck/features/bugfix/LookupIn.feature | 1 + third-party/install-gcc.sh | 2 +- 10 files changed, 167 insertions(+), 255 deletions(-) diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index bdc3488fb80..16a41382348 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -161,7 +161,7 @@ jobs: ;; esac working-directory: tests/ - timeout-minutes: 2 + timeout-minutes: 4 - name: Pytest run: | make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} test @@ -270,7 +270,7 @@ jobs: run: | make standalone-up working-directory: tests/ - timeout-minutes: 60 + timeout-minutes: 4 - name: TCK run: | make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} standalone-tck diff --git a/src/clients/storage/InternalStorageClient.cpp b/src/clients/storage/InternalStorageClient.cpp index b2399485a85..ce1f4be8b1c 100644 --- a/src/clients/storage/InternalStorageClient.cpp +++ b/src/clients/storage/InternalStorageClient.cpp @@ -64,7 +64,8 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq } auto resp = getResponse( evb, - std::make_pair(leader, chainReq), + leader, + chainReq, [](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) { return client->future_chainUpdateEdge(r); }); @@ -102,7 +103,8 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq, cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion); auto resp = getResponse( evb, - std::make_pair(leader, chainReq), + leader, + chainReq, [](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) { return client->future_chainAddEdges(r); }); @@ -158,7 +160,8 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req, chainReq.term_ref() = termId; auto resp = getResponse( evb, - std::make_pair(leader, chainReq), + leader, + chainReq, [](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) { return client->future_chainDeleteEdges(r); }); diff --git a/src/clients/storage/StorageClient.cpp b/src/clients/storage/StorageClient.cpp index 115f869883a..c92f0951af6 100644 --- a/src/clients/storage/StorageClient.cpp +++ b/src/clients/storage/StorageClient.cpp @@ -352,8 +352,6 @@ folly::Future> StorageClient::updateVert return folly::makeFuture>(cbStatus.status()); } - std::pair request; - DCHECK(!!metaClient_); auto status = metaClient_->partsNum(param.space); if (!status.ok()) { @@ -370,7 +368,6 @@ folly::Future> StorageClient::updateVert if (!host.ok()) { return folly::makeFuture>(host.status()); } - request.first = std::move(host).value(); cpp2::UpdateVertexRequest req; req.space_id_ref() = param.space; req.vertex_id_ref() = vertexId; @@ -383,10 +380,10 @@ folly::Future> StorageClient::updateVert if (condition.size() > 0) { req.condition_ref() = std::move(condition); } - request.second = std::move(req); return getResponse(param.evb, - std::move(request), + host.value(), + req, [](ThriftClientType* client, const cpp2::UpdateVertexRequest& r) { return client->future_updateVertex(r); }); @@ -405,8 +402,6 @@ folly::Future> StorageClient::updateEdge return folly::makeFuture>(cbStatus.status()); } - std::pair request; - DCHECK(!!metaClient_); auto status = metaClient_->partsNum(space); if (!status.ok()) { @@ -423,7 +418,6 @@ folly::Future> StorageClient::updateEdge if (!host.ok()) { return folly::makeFuture>(host.status()); } - request.first = std::move(host).value(); cpp2::UpdateEdgeRequest req; req.space_id_ref() = space; req.edge_key_ref() = edgeKey; @@ -435,10 +429,10 @@ folly::Future> StorageClient::updateEdge if (condition.size() > 0) { req.condition_ref() = std::move(condition); } - request.second = std::move(req); return getResponse(param.evb, - std::move(request), + host.value(), + req, [useExperimentalFeature = param.useExperimentalFeature]( ThriftClientType* client, const cpp2::UpdateEdgeRequest& r) { return useExperimentalFeature ? client->future_chainUpdateEdge(r) @@ -449,7 +443,6 @@ folly::Future> StorageClient::updateEdge folly::Future> StorageClient::getUUID(GraphSpaceID space, const std::string& name, folly::EventBase* evb) { - std::pair request; DCHECK(!!metaClient_); auto status = metaClient_->partsNum(space); if (!status.ok()) { @@ -466,15 +459,13 @@ folly::Future> StorageClient::getUUID(GraphSpaceID s if (!host.ok()) { return folly::makeFuture>(host.status()); } - request.first = std::move(host).value(); cpp2::GetUUIDReq req; req.space_id_ref() = space; req.part_id_ref() = part; req.name_ref() = name; - request.second = std::move(req); return getResponse( - evb, std::move(request), [](ThriftClientType* client, const cpp2::GetUUIDReq& r) { + evb, host.value(), req, [](ThriftClientType* client, const cpp2::GetUUIDReq& r) { return client->future_getUUID(r); }); } diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 00405c3a6f2..9a4e25d7eae 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -6,71 +6,26 @@ #ifndef CLIENTS_STORAGE_STORAGECLIENTBASE_INL_H #define CLIENTS_STORAGE_STORAGECLIENTBASE_INL_H +#include #include +#include + +#include +#include #include "clients/storage/stats/StorageClientStats.h" +#include "common/base/Logging.h" +#include "common/base/StatusOr.h" +#include "common/datatypes/HostAddr.h" #include "common/ssl/SSLConfig.h" #include "common/stats/StatsManager.h" +#include "common/thrift/ThriftTypes.h" #include "common/time/WallClock.h" +#include "interface/gen-cpp2/common_types.h" namespace nebula { namespace storage { -template -struct ResponseContext { - public: - ResponseContext(size_t reqsSent, RemoteFunc&& remoteFunc) - : resp(reqsSent), serverMethod(std::move(remoteFunc)) {} - - // Return true if processed all responses - bool finishSending() { - std::lock_guard g(lock_); - finishSending_ = true; - if (ongoingRequests_.empty() && !fulfilled_) { - fulfilled_ = true; - return true; - } else { - return false; - } - } - - std::pair insertRequest(HostAddr host, Request&& req) { - std::lock_guard g(lock_); - auto res = ongoingRequests_.emplace(host, std::move(req)); - return std::make_pair(&res.first->second, res.second); - } - - const Request& findRequest(HostAddr host) { - std::lock_guard g(lock_); - auto it = ongoingRequests_.find(host); - DCHECK(it != ongoingRequests_.end()); - return it->second; - } - - // Return true if processed all responses - bool removeRequest(HostAddr host) { - std::lock_guard g(lock_); - ongoingRequests_.erase(host); - if (finishSending_ && !fulfilled_ && ongoingRequests_.empty()) { - fulfilled_ = true; - return true; - } else { - return false; - } - } - - public: - folly::Promise> promise; - StorageRpcResponse resp; - RemoteFunc serverMethod; - - private: - std::mutex lock_; - std::unordered_map ongoingRequests_; - bool finishSending_{false}; - bool fulfilled_{false}; -}; - template StorageClientBase::StorageClientBase( std::shared_ptr threadPool, meta::MetaClient* metaClient) @@ -120,180 +75,138 @@ StorageClientBase::collectResponse( folly::EventBase* evb, std::unordered_map requests, RemoteFunc&& remoteFunc) { - using TransportException = apache::thrift::transport::TTransportException; - auto context = std::make_shared>( - requests.size(), std::move(remoteFunc)); - - DCHECK(!!ioThreadPool_); - - for (auto& req : requests) { - auto& host = req.first; - auto spaceId = req.second.get_space_id(); - auto res = context->insertRequest(host, std::move(req.second)); - DCHECK(res.second); - evb = ioThreadPool_->getEventBase(); - // Invoke the remote method - folly::via(evb, [this, evb, context, host, spaceId, res]() mutable { - auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); - // Result is a pair of - auto start = time::WallClock::fastNowInMicroSec(); - context - ->serverMethod(client.get(), *res.first) - // Future process code will be executed on the IO thread - // Since all requests are sent using the same eventbase, all - // then-callback will be executed on the same IO thread - .via(evb) - .thenValue([this, context, host, spaceId, start](Response&& resp) { - auto& result = resp.get_result(); - bool hasFailure{false}; - for (auto& code : result.get_failed_parts()) { - VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " - << static_cast(code.get_code()); - hasFailure = true; - context->resp.emplaceFailedPart(code.get_part_id(), code.get_code()); - if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - auto* leader = code.get_leader(); - if (isValidHostPtr(leader)) { - updateLeader(spaceId, code.get_part_id(), *leader); - } else { - invalidLeader(spaceId, code.get_part_id()); + std::vector>> respFutures; + respFutures.reserve(requests.size()); + + auto hosts = std::make_shared>(requests.size()); + auto totalLatencies = std::make_shared>(requests.size()); + + for (const auto& req : requests) { + auto start = time::WallClock::fastNowInMicroSec(); + + size_t i = respFutures.size(); + (*hosts)[i] = req.first; + // Future process code will be executed on the IO thread + // Since all requests are sent using the same eventbase, all + // then-callback will be executed on the same IO thread + auto fut = getResponse(evb, req.first, req.second, std::move(remoteFunc)) + .ensure([totalLatencies, i, start]() { + (*totalLatencies)[i] = time::WallClock::fastNowInMicroSec() - start; + }); + + respFutures.emplace_back(std::move(fut)); + } + + return folly::collectAll(respFutures) + .deferValue([this, requests = std::move(requests), totalLatencies, hosts]( + std::vector>>&& resps) { + StorageRpcResponse rpcResp(resps.size()); + for (size_t i = 0; i < resps.size(); i++) { + auto& host = hosts->at(i); + auto& tryResp = resps[i]; + std::optional errMsg; + if (tryResp.hasException()) { + errMsg = std::string(tryResp.exception().what().c_str()); + } else { + auto status = std::move(tryResp).value(); + if (status.ok()) { + auto resp = std::move(status).value(); + auto result = resp.get_result(); + + if (!result.get_failed_parts().empty()) { + rpcResp.markFailure(); + for (auto& part : result.get_failed_parts()) { + rpcResp.emplaceFailedPart(part.get_part_id(), part.get_code()); } - } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || - code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { - invalidLeader(spaceId, code.get_part_id()); - } else { - // do nothing } - } - if (hasFailure) { - context->resp.markFailure(); - } - // Adjust the latency - auto latency = result.get_latency_in_us(); - context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start); - - // Keep the response - context->resp.addResponse(std::move(resp)); - }) - .thenError(folly::tag_t{}, - [this, context, host, spaceId](TransportException&& ex) { - auto& r = context->findRequest(host); - auto parts = getReqPartsId(r); - if (ex.getType() == TransportException::TIMED_OUT) { - LOG(ERROR) << "Request to " << host << " time out: " << ex.what(); - } else { - invalidLeader(spaceId, parts); - LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); - } - context->resp.appendFailedParts(parts, - nebula::cpp2::ErrorCode::E_RPC_FAILURE); - context->resp.markFailure(); - }) - .thenError(folly::tag_t{}, - [this, context, host, spaceId](std::exception&& ex) { - auto& r = context->findRequest(host); - auto parts = getReqPartsId(r); - LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); - invalidLeader(spaceId, parts); - context->resp.appendFailedParts(parts, - nebula::cpp2::ErrorCode::E_RPC_FAILURE); - context->resp.markFailure(); - }) - .ensure([context, host] { - if (context->removeRequest(host)) { - // Received all responses - context->promise.setValue(std::move(context->resp)); + // Adjust the latency + auto latency = result.get_latency_in_us(); + rpcResp.setLatency(host, latency, totalLatencies->at(i)); + // Keep the response + rpcResp.addResponse(std::move(resp)); + } else { + errMsg = std::move(status).status().message(); } - }); - }); // via - } // for - - if (context->finishSending()) { - // Received all responses, most likely, all rpc failed - context->promise.setValue(std::move(context->resp)); - } - - return context->promise.getSemiFuture(); + } + + if (errMsg) { + rpcResp.markFailure(); + LOG(ERROR) << "There some RPC errors: " << errMsg.value(); + auto req = requests.at(host); + auto parts = getReqPartsId(req); + rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE); + } + } + + return rpcResp; + }); } template template folly::Future> StorageClientBase::getResponse( - folly::EventBase* evb, std::pair&& request, RemoteFunc&& remoteFunc) { - auto pro = std::make_shared>>(); - auto f = pro->getFuture(); - getResponseImpl( - evb, std::forward(request), std::forward(remoteFunc), pro); - return f; -} + folly::EventBase* evb, const HostAddr& host, const Request& request, RemoteFunc&& remoteFunc) { + static_assert( + folly::isFuture>::value); -template -template -void StorageClientBase::getResponseImpl( - folly::EventBase* evb, - std::pair request, - RemoteFunc remoteFunc, - std::shared_ptr>> pro) { stats::StatsManager::addValue(kNumRpcSentToStoraged); - using TransportException = apache::thrift::transport::TTransportException; if (evb == nullptr) { - DCHECK(!!ioThreadPool_); - evb = ioThreadPool_->getEventBase(); + evb = DCHECK_NOTNULL(ioThreadPool_)->getEventBase(); } - auto reqPtr = std::make_shared>(std::move(request.first), - std::move(request.second)); - folly::via( - evb, - [evb, request = std::move(reqPtr), remoteFunc = std::move(remoteFunc), pro, this]() mutable { - auto host = request->first; + + auto spaceId = request.get_space_id(); + return folly::via(evb) + .thenValue([remoteFunc = std::move(remoteFunc), request, evb, host, this](auto&&) { + // NOTE: Create new channel on each thread to avoid TIMEOUT RPC error auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); - auto spaceId = request->second.get_space_id(); - auto partsId = getReqPartsId(request->second); - remoteFunc(client.get(), request->second) - .via(evb) - .thenValue([spaceId, pro, request, this](Response&& resp) mutable { - auto& result = resp.get_result(); - for (auto& code : result.get_failed_parts()) { - VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " - << static_cast(code.get_code()); - if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - auto* leader = code.get_leader(); - if (isValidHostPtr(leader)) { - updateLeader(spaceId, code.get_part_id(), *leader); - } else { - invalidLeader(spaceId, code.get_part_id()); - } - } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || - code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { - invalidLeader(spaceId, code.get_part_id()); - } + return remoteFunc(client.get(), request); + }) + .thenValue([spaceId, this](Response&& resp) mutable -> StatusOr { + auto& result = resp.get_result(); + for (auto& part : result.get_failed_parts()) { + auto partId = part.get_part_id(); + auto code = part.get_code(); + + VLOG(3) << "Failure! Failed part " << partId << ", failed part " + << static_cast(code); + + switch (code) { + case nebula::cpp2::ErrorCode::E_LEADER_CHANGED: { + auto* leader = part.get_leader(); + if (isValidHostPtr(leader)) { + updateLeader(spaceId, partId, *leader); + } else { + invalidLeader(spaceId, partId); } - pro->setValue(std::move(resp)); - }) - .thenError(folly::tag_t{}, - [spaceId, partsId = std::move(partsId), host, pro, this]( - TransportException&& ex) mutable { - stats::StatsManager::addValue(kNumRpcSentToStoragedFailed); - if (ex.getType() == TransportException::TIMED_OUT) { - LOG(ERROR) << "Request to " << host << " time out: " << ex.what(); - } else { - invalidLeader(spaceId, partsId); - LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); - } - pro->setValue(Status::Error( - folly::stringPrintf("RPC failure in StorageClient: %s", ex.what()))); - }) - .thenError(folly::tag_t{}, - [spaceId, partsId = std::move(partsId), host, pro, this]( - std::exception&& ex) mutable { - stats::StatsManager::addValue(kNumRpcSentToStoragedFailed); - // exception occurred during RPC - pro->setValue(Status::Error( - folly::stringPrintf("RPC failure in StorageClient: %s", ex.what()))); - invalidLeader(spaceId, partsId); - }); - }); // via + break; + } + case nebula::cpp2::ErrorCode::E_PART_NOT_FOUND: + case nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND: { + invalidLeader(spaceId, partId); + break; + } + default: + break; + } + } + return std::move(resp); + }) + .thenError([request, host, spaceId, this]( + folly::exception_wrapper&& exWrapper) mutable -> StatusOr { + stats::StatsManager::addValue(kNumRpcSentToStoragedFailed); + + using TransportException = apache::thrift::transport::TTransportException; + auto ex = exWrapper.get_exception(); + if (ex && ex->getType() == TransportException::TIMED_OUT) { + LOG(ERROR) << "Request to " << host << " time out: " << ex->what(); + } else { + auto partsId = getReqPartsId(request); + invalidLeader(spaceId, partsId); + LOG(ERROR) << "Request to " << host << " failed: " << ex->what(); + } + return Status::Error("RPC failure in StorageClient: %s", ex->what()); + }); } template diff --git a/src/clients/storage/StorageClientBase.h b/src/clients/storage/StorageClientBase.h index c86ee653207..1b2799c943f 100644 --- a/src/clients/storage/StorageClientBase.h +++ b/src/clients/storage/StorageClientBase.h @@ -12,6 +12,7 @@ #include "clients/meta/MetaClient.h" #include "common/base/Base.h" #include "common/base/StatusOr.h" +#include "common/datatypes/HostAddr.h" #include "common/meta/Common.h" #include "common/thrift/ThriftClientManager.h" #include "interface/gen-cpp2/storage_types.h" @@ -144,18 +145,10 @@ class StorageClientBase { class Response = typename std::result_of::type::value_type> folly::Future> getResponse(folly::EventBase* evb, - std::pair&& request, + const HostAddr& host, + const Request& request, RemoteFunc&& remoteFunc); - template ::type::value_type> - void getResponseImpl(folly::EventBase* evb, - std::pair request, - RemoteFunc remoteFunc, - std::shared_ptr>> pro); - // Cluster given ids into the host they belong to // The method returns a map // host_addr (A host, but in most case, the leader will be chosen) diff --git a/src/common/base/StatusOr.h b/src/common/base/StatusOr.h index 3aa166df51d..605f94b567d 100644 --- a/src/common/base/StatusOr.h +++ b/src/common/base/StatusOr.h @@ -6,6 +6,8 @@ #ifndef COMMON_BASE_STATUSOR_H_ #define COMMON_BASE_STATUSOR_H_ +#include + #include "common/base/Base.h" #include "common/base/Status.h" @@ -336,6 +338,21 @@ class StatusOr final { uint8_t state_; }; +namespace internal { +template +struct StatusOrValueType { + using type = T; +}; + +template +struct StatusOrValueType> { + using type = std::remove_cv_t>; +}; +} // namespace internal + +template +using status_or_value_t = typename internal::StatusOrValueType::type; + } // namespace nebula #endif // COMMON_BASE_STATUSOR_H_ diff --git a/tests/Makefile b/tests/Makefile index 2b5013a0a32..39a73466ca1 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -25,8 +25,8 @@ PASSWORD_LOCK_TIME_IN_SECS ?= 0 # commands gherkin_fmt = ~/.local/bin/reformat-gherkin run_test = PYTHONPATH=$$PYTHONPATH:$(CURR_DIR)/.. $(CURR_DIR)/nebula-test-run.py -test_without_skip = python3 -m pytest -m "not skip" -test_without_skip_sa = python3 -m pytest -m "not skip and not distonly" +test_without_skip = python3 -m pytest -m "not skip" --build_dir=$(BUILD_DIR) +test_without_skip_sa = python3 -m pytest -m "not skip and not distonly" --build_dir=$(BUILD_DIR) test_j = $(test_without_skip) -n$(J) test_j_sa = $(test_without_skip_sa) -n$(J) diff --git a/tests/tck/conftest.py b/tests/tck/conftest.py index 3af1f42eea4..8bec4b1b5c8 100644 --- a/tests/tck/conftest.py +++ b/tests/tck/conftest.py @@ -732,12 +732,8 @@ def execution_should_be_succ(exec_ctx): check_resp(rs, stmt) -@then( - rparse( - r"(?Pa|an) (?P\w+) should be raised at (?P