diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 517f78b1a6b..0cd5a90e134 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -159,7 +159,7 @@ jobs: ;; esac working-directory: tests/ - timeout-minutes: 2 + timeout-minutes: 4 - name: Pytest run: | make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} test @@ -266,7 +266,7 @@ jobs: run: | make standalone-up working-directory: tests/ - timeout-minutes: 60 + timeout-minutes: 4 - name: TCK run: | make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} standalone-tck diff --git a/src/clients/storage/InternalStorageClient.cpp b/src/clients/storage/InternalStorageClient.cpp index b2399485a85..9af4ff3a1b5 100644 --- a/src/clients/storage/InternalStorageClient.cpp +++ b/src/clients/storage/InternalStorageClient.cpp @@ -64,9 +64,10 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq } auto resp = getResponse( evb, - std::make_pair(leader, chainReq), + leader, + chainReq, [](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) { - return client->future_chainUpdateEdge(r); + return client->semifuture_chainUpdateEdge(r); }); std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable { @@ -102,9 +103,10 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq, cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion); auto resp = getResponse( evb, - std::make_pair(leader, chainReq), + leader, + chainReq, [](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) { - return client->future_chainAddEdges(r); + return client->semifuture_chainAddEdges(r); }); std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable { @@ -158,9 +160,10 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req, chainReq.term_ref() = termId; auto resp = getResponse( evb, - std::make_pair(leader, chainReq), + leader, + chainReq, [](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) { - return client->future_chainDeleteEdges(r); + return client->semifuture_chainDeleteEdges(r); }); std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable { diff --git a/src/clients/storage/StorageClient.cpp b/src/clients/storage/StorageClient.cpp index 115f869883a..3ea2761a41c 100644 --- a/src/clients/storage/StorageClient.cpp +++ b/src/clients/storage/StorageClient.cpp @@ -103,7 +103,7 @@ StorageRpcRespFuture StorageClient::getNeighbors( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetNeighborsRequest& r) { - return client->future_getNeighbors(r); + return client->semifuture_getNeighbors(r); }); } @@ -142,7 +142,7 @@ StorageRpcRespFuture StorageClient::addVertices( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::AddVerticesRequest& r) { - return client->future_addVertices(r); + return client->semifuture_addVertices(r); }); } @@ -180,8 +180,8 @@ StorageRpcRespFuture StorageClient::addEdges(const CommonReq std::move(requests), [useToss = param.useExperimentalFeature](ThriftClientType* client, const cpp2::AddEdgesRequest& r) { - return useToss ? client->future_chainAddEdges(r) - : client->future_addEdges(r); + return useToss ? client->semifuture_chainAddEdges(r) + : client->semifuture_addEdges(r); }); } @@ -237,7 +237,7 @@ StorageRpcRespFuture StorageClient::getProps( return collectResponse( param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) { - return client->future_getProps(r); + return client->semifuture_getProps(r); }); } @@ -270,8 +270,8 @@ StorageRpcRespFuture StorageClient::deleteEdges( std::move(requests), [useToss = param.useExperimentalFeature]( ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) { - return useToss ? client->future_chainDeleteEdges(r) - : client->future_deleteEdges(r); + return useToss ? client->semifuture_chainDeleteEdges(r) + : client->semifuture_deleteEdges(r); }); } @@ -303,7 +303,7 @@ StorageRpcRespFuture StorageClient::deleteVertices( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::DeleteVerticesRequest& r) { - return client->future_deleteVertices(r); + return client->semifuture_deleteVertices(r); }); } @@ -335,7 +335,7 @@ StorageRpcRespFuture StorageClient::deleteTags( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::DeleteTagsRequest& r) { - return client->future_deleteTags(r); + return client->semifuture_deleteTags(r); }); } @@ -352,8 +352,6 @@ folly::Future> StorageClient::updateVert return folly::makeFuture>(cbStatus.status()); } - std::pair request; - DCHECK(!!metaClient_); auto status = metaClient_->partsNum(param.space); if (!status.ok()) { @@ -370,7 +368,6 @@ folly::Future> StorageClient::updateVert if (!host.ok()) { return folly::makeFuture>(host.status()); } - request.first = std::move(host).value(); cpp2::UpdateVertexRequest req; req.space_id_ref() = param.space; req.vertex_id_ref() = vertexId; @@ -383,12 +380,12 @@ folly::Future> StorageClient::updateVert if (condition.size() > 0) { req.condition_ref() = std::move(condition); } - request.second = std::move(req); return getResponse(param.evb, - std::move(request), + host.value(), + req, [](ThriftClientType* client, const cpp2::UpdateVertexRequest& r) { - return client->future_updateVertex(r); + return client->semifuture_updateVertex(r); }); } @@ -405,8 +402,6 @@ folly::Future> StorageClient::updateEdge return folly::makeFuture>(cbStatus.status()); } - std::pair request; - DCHECK(!!metaClient_); auto status = metaClient_->partsNum(space); if (!status.ok()) { @@ -423,7 +418,6 @@ folly::Future> StorageClient::updateEdge if (!host.ok()) { return folly::makeFuture>(host.status()); } - request.first = std::move(host).value(); cpp2::UpdateEdgeRequest req; req.space_id_ref() = space; req.edge_key_ref() = edgeKey; @@ -435,21 +429,20 @@ folly::Future> StorageClient::updateEdge if (condition.size() > 0) { req.condition_ref() = std::move(condition); } - request.second = std::move(req); return getResponse(param.evb, - std::move(request), + host.value(), + req, [useExperimentalFeature = param.useExperimentalFeature]( ThriftClientType* client, const cpp2::UpdateEdgeRequest& r) { - return useExperimentalFeature ? client->future_chainUpdateEdge(r) - : client->future_updateEdge(r); + return useExperimentalFeature ? client->semifuture_chainUpdateEdge(r) + : client->semifuture_updateEdge(r); }); } folly::Future> StorageClient::getUUID(GraphSpaceID space, const std::string& name, folly::EventBase* evb) { - std::pair request; DCHECK(!!metaClient_); auto status = metaClient_->partsNum(space); if (!status.ok()) { @@ -466,16 +459,14 @@ folly::Future> StorageClient::getUUID(GraphSpaceID s if (!host.ok()) { return folly::makeFuture>(host.status()); } - request.first = std::move(host).value(); cpp2::GetUUIDReq req; req.space_id_ref() = space; req.part_id_ref() = part; req.name_ref() = name; - request.second = std::move(req); return getResponse( - evb, std::move(request), [](ThriftClientType* client, const cpp2::GetUUIDReq& r) { - return client->future_getUUID(r); + evb, host.value(), req, [](ThriftClientType* client, const cpp2::GetUUIDReq& r) { + return client->semifuture_getUUID(r); }); } @@ -523,7 +514,7 @@ StorageRpcRespFuture StorageClient::lookupIndex( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::LookupIndexRequest& r) { - return client->future_lookupIndex(r); + return client->semifuture_lookupIndex(r); }); } @@ -552,7 +543,7 @@ StorageRpcRespFuture StorageClient::lookupAndTravers return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::LookupAndTraverseRequest& r) { - return client->future_lookupAndTraverse(r); + return client->semifuture_lookupAndTraverse(r); }); } @@ -583,7 +574,7 @@ StorageRpcRespFuture StorageClient::scanEdge( return collectResponse( param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::ScanEdgeRequest& r) { - return client->future_scanEdge(r); + return client->semifuture_scanEdge(r); }); } @@ -615,7 +606,7 @@ StorageRpcRespFuture StorageClient::scanVertex( return collectResponse(param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::ScanVertexRequest& r) { - return client->future_scanVertex(r); + return client->semifuture_scanVertex(r); }); } @@ -641,7 +632,7 @@ folly::SemiFuture> StorageClient::get( return collectResponse( evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVGetRequest& r) { - return client->future_get(r); + return client->semifuture_get(r); }); } @@ -666,7 +657,7 @@ folly::SemiFuture> StorageClient::put( return collectResponse( evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVPutRequest& r) { - return client->future_put(r); + return client->semifuture_put(r); }); } @@ -691,7 +682,7 @@ folly::SemiFuture> StorageClient::remove( return collectResponse( evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVRemoveRequest& r) { - return client->future_remove(r); + return client->semifuture_remove(r); }); } diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 00405c3a6f2..47879229bd2 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -6,71 +6,25 @@ #ifndef CLIENTS_STORAGE_STORAGECLIENTBASE_INL_H #define CLIENTS_STORAGE_STORAGECLIENTBASE_INL_H +#include +#include #include +#include + +#include #include "clients/storage/stats/StorageClientStats.h" +#include "common/base/StatusOr.h" +#include "common/datatypes/HostAddr.h" #include "common/ssl/SSLConfig.h" #include "common/stats/StatsManager.h" +#include "common/thrift/ThriftTypes.h" #include "common/time/WallClock.h" +#include "interface/gen-cpp2/common_types.h" namespace nebula { namespace storage { -template -struct ResponseContext { - public: - ResponseContext(size_t reqsSent, RemoteFunc&& remoteFunc) - : resp(reqsSent), serverMethod(std::move(remoteFunc)) {} - - // Return true if processed all responses - bool finishSending() { - std::lock_guard g(lock_); - finishSending_ = true; - if (ongoingRequests_.empty() && !fulfilled_) { - fulfilled_ = true; - return true; - } else { - return false; - } - } - - std::pair insertRequest(HostAddr host, Request&& req) { - std::lock_guard g(lock_); - auto res = ongoingRequests_.emplace(host, std::move(req)); - return std::make_pair(&res.first->second, res.second); - } - - const Request& findRequest(HostAddr host) { - std::lock_guard g(lock_); - auto it = ongoingRequests_.find(host); - DCHECK(it != ongoingRequests_.end()); - return it->second; - } - - // Return true if processed all responses - bool removeRequest(HostAddr host) { - std::lock_guard g(lock_); - ongoingRequests_.erase(host); - if (finishSending_ && !fulfilled_ && ongoingRequests_.empty()) { - fulfilled_ = true; - return true; - } else { - return false; - } - } - - public: - folly::Promise> promise; - StorageRpcResponse resp; - RemoteFunc serverMethod; - - private: - std::mutex lock_; - std::unordered_map ongoingRequests_; - bool finishSending_{false}; - bool fulfilled_{false}; -}; - template StorageClientBase::StorageClientBase( std::shared_ptr threadPool, meta::MetaClient* metaClient) @@ -120,180 +74,135 @@ StorageClientBase::collectResponse( folly::EventBase* evb, std::unordered_map requests, RemoteFunc&& remoteFunc) { - using TransportException = apache::thrift::transport::TTransportException; - auto context = std::make_shared>( - requests.size(), std::move(remoteFunc)); - - DCHECK(!!ioThreadPool_); - - for (auto& req : requests) { - auto& host = req.first; - auto spaceId = req.second.get_space_id(); - auto res = context->insertRequest(host, std::move(req.second)); - DCHECK(res.second); - evb = ioThreadPool_->getEventBase(); - // Invoke the remote method - folly::via(evb, [this, evb, context, host, spaceId, res]() mutable { - auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); - // Result is a pair of - auto start = time::WallClock::fastNowInMicroSec(); - context - ->serverMethod(client.get(), *res.first) - // Future process code will be executed on the IO thread - // Since all requests are sent using the same eventbase, all - // then-callback will be executed on the same IO thread - .via(evb) - .thenValue([this, context, host, spaceId, start](Response&& resp) { - auto& result = resp.get_result(); - bool hasFailure{false}; - for (auto& code : result.get_failed_parts()) { - VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " - << static_cast(code.get_code()); - hasFailure = true; - context->resp.emplaceFailedPart(code.get_part_id(), code.get_code()); - if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - auto* leader = code.get_leader(); - if (isValidHostPtr(leader)) { - updateLeader(spaceId, code.get_part_id(), *leader); - } else { - invalidLeader(spaceId, code.get_part_id()); + std::vector>> respFutures; + respFutures.reserve(requests.size()); + + auto hosts = std::make_shared>(requests.size()); + auto totalLatencies = std::make_shared>(requests.size()); + + for (const auto& req : requests) { + auto start = time::WallClock::fastNowInMicroSec(); + + size_t i = respFutures.size(); + (*hosts)[i] = req.first; + // Future process code will be executed on the IO thread + // Since all requests are sent using the same eventbase, all + // then-callback will be executed on the same IO thread + auto fut = getResponse(evb, req.first, req.second, std::move(remoteFunc)) + .ensure([totalLatencies, i, start]() { + (*totalLatencies)[i] = time::WallClock::fastNowInMicroSec() - start; + }); + + respFutures.emplace_back(std::move(fut)); + } + + return folly::collectAll(respFutures) + .deferValue([this, requests = std::move(requests), totalLatencies, hosts]( + std::vector>>&& resps) { + StorageRpcResponse rpcResp(resps.size()); + for (size_t i = 0; i < resps.size(); i++) { + auto& host = hosts->at(i); + auto& tryResp = resps[i]; + folly::Optional errMsg; + if (tryResp.hasException()) { + errMsg = std::string(tryResp.exception().what().c_str()); + } else { + auto status = std::move(tryResp).value(); + if (status.ok()) { + auto resp = std::move(status).value(); + auto result = resp.get_result(); + + if (!result.get_failed_parts().empty()) { + rpcResp.markFailure(); + for (auto& part : result.get_failed_parts()) { + rpcResp.emplaceFailedPart(part.get_part_id(), part.get_code()); } - } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || - code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { - invalidLeader(spaceId, code.get_part_id()); - } else { - // do nothing } - } - if (hasFailure) { - context->resp.markFailure(); - } - // Adjust the latency - auto latency = result.get_latency_in_us(); - context->resp.setLatency(host, latency, time::WallClock::fastNowInMicroSec() - start); - - // Keep the response - context->resp.addResponse(std::move(resp)); - }) - .thenError(folly::tag_t{}, - [this, context, host, spaceId](TransportException&& ex) { - auto& r = context->findRequest(host); - auto parts = getReqPartsId(r); - if (ex.getType() == TransportException::TIMED_OUT) { - LOG(ERROR) << "Request to " << host << " time out: " << ex.what(); - } else { - invalidLeader(spaceId, parts); - LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); - } - context->resp.appendFailedParts(parts, - nebula::cpp2::ErrorCode::E_RPC_FAILURE); - context->resp.markFailure(); - }) - .thenError(folly::tag_t{}, - [this, context, host, spaceId](std::exception&& ex) { - auto& r = context->findRequest(host); - auto parts = getReqPartsId(r); - LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); - invalidLeader(spaceId, parts); - context->resp.appendFailedParts(parts, - nebula::cpp2::ErrorCode::E_RPC_FAILURE); - context->resp.markFailure(); - }) - .ensure([context, host] { - if (context->removeRequest(host)) { - // Received all responses - context->promise.setValue(std::move(context->resp)); + // Adjust the latency + auto latency = result.get_latency_in_us(); + rpcResp.setLatency(host, latency, totalLatencies->at(i)); + // Keep the response + rpcResp.addResponse(std::move(resp)); + } else { + errMsg = std::move(status).status().message(); } - }); - }); // via - } // for - - if (context->finishSending()) { - // Received all responses, most likely, all rpc failed - context->promise.setValue(std::move(context->resp)); - } - - return context->promise.getSemiFuture(); + } + + if (errMsg) { + rpcResp.markFailure(); + LOG(ERROR) << "There some RPC errors: " << errMsg.value(); + auto req = requests.at(host); + auto parts = getReqPartsId(req); + rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE); + } + } + + return rpcResp; + }); } template template folly::Future> StorageClientBase::getResponse( - folly::EventBase* evb, std::pair&& request, RemoteFunc&& remoteFunc) { - auto pro = std::make_shared>>(); - auto f = pro->getFuture(); - getResponseImpl( - evb, std::forward(request), std::forward(remoteFunc), pro); - return f; -} + folly::EventBase* evb, const HostAddr& host, const Request& request, RemoteFunc&& remoteFunc) { + static_assert(folly::isSemiFuture< + typename std::result_of::type>::value); -template -template -void StorageClientBase::getResponseImpl( - folly::EventBase* evb, - std::pair request, - RemoteFunc remoteFunc, - std::shared_ptr>> pro) { stats::StatsManager::addValue(kNumRpcSentToStoraged); - using TransportException = apache::thrift::transport::TTransportException; if (evb == nullptr) { - DCHECK(!!ioThreadPool_); - evb = ioThreadPool_->getEventBase(); + evb = DCHECK_NOTNULL(ioThreadPool_)->getEventBase(); } - auto reqPtr = std::make_shared>(std::move(request.first), - std::move(request.second)); - folly::via( - evb, - [evb, request = std::move(reqPtr), remoteFunc = std::move(remoteFunc), pro, this]() mutable { - auto host = request->first; - auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); - auto spaceId = request->second.get_space_id(); - auto partsId = getReqPartsId(request->second); - remoteFunc(client.get(), request->second) - .via(evb) - .thenValue([spaceId, pro, request, this](Response&& resp) mutable { - auto& result = resp.get_result(); - for (auto& code : result.get_failed_parts()) { - VLOG(3) << "Failure! Failed part " << code.get_part_id() << ", failed code " - << static_cast(code.get_code()); - if (code.get_code() == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - auto* leader = code.get_leader(); - if (isValidHostPtr(leader)) { - updateLeader(spaceId, code.get_part_id(), *leader); - } else { - invalidLeader(spaceId, code.get_part_id()); - } - } else if (code.get_code() == nebula::cpp2::ErrorCode::E_PART_NOT_FOUND || - code.get_code() == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { - invalidLeader(spaceId, code.get_part_id()); - } + + auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); + auto spaceId = request.get_space_id(); + auto partsId = getReqPartsId(request); + return remoteFunc(client.get(), request) + .via(evb) + .thenValue([spaceId, this](Response&& resp) mutable -> StatusOr { + auto& result = resp.get_result(); + for (auto& part : result.get_failed_parts()) { + auto partId = part.get_part_id(); + auto code = part.get_code(); + + VLOG(3) << "Failure! Failed part " << partId << ", failed part " + << static_cast(code); + + switch (code) { + case nebula::cpp2::ErrorCode::E_LEADER_CHANGED: { + auto* leader = part.get_leader(); + if (isValidHostPtr(leader)) { + updateLeader(spaceId, partId, *leader); + } else { + invalidLeader(spaceId, partId); } - pro->setValue(std::move(resp)); - }) - .thenError(folly::tag_t{}, - [spaceId, partsId = std::move(partsId), host, pro, this]( - TransportException&& ex) mutable { - stats::StatsManager::addValue(kNumRpcSentToStoragedFailed); - if (ex.getType() == TransportException::TIMED_OUT) { - LOG(ERROR) << "Request to " << host << " time out: " << ex.what(); - } else { - invalidLeader(spaceId, partsId); - LOG(ERROR) << "Request to " << host << " failed: " << ex.what(); - } - pro->setValue(Status::Error( - folly::stringPrintf("RPC failure in StorageClient: %s", ex.what()))); - }) - .thenError(folly::tag_t{}, - [spaceId, partsId = std::move(partsId), host, pro, this]( - std::exception&& ex) mutable { - stats::StatsManager::addValue(kNumRpcSentToStoragedFailed); - // exception occurred during RPC - pro->setValue(Status::Error( - folly::stringPrintf("RPC failure in StorageClient: %s", ex.what()))); - invalidLeader(spaceId, partsId); - }); - }); // via + break; + } + case nebula::cpp2::ErrorCode::E_PART_NOT_FOUND: + case nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND: { + invalidLeader(spaceId, partId); + break; + } + default: + break; + } + } + return std::move(resp); + }) + .thenError([this, host, spaceId, partsId = std::move(partsId)]( + folly::exception_wrapper&& exWrapper) mutable -> StatusOr { + stats::StatsManager::addValue(kNumRpcSentToStoragedFailed); + + using TransportException = apache::thrift::transport::TTransportException; + auto ex = exWrapper.get_exception(); + if (ex && ex->getType() == TransportException::TIMED_OUT) { + LOG(ERROR) << "Request to " << host << " time out: " << ex->what(); + } else { + invalidLeader(spaceId, partsId); + LOG(ERROR) << "Request to " << host << " failed: " << ex->what(); + } + return Status::Error(folly::sformat("RPC failure in StorageClient: {}", ex->what())); + }); } template diff --git a/src/clients/storage/StorageClientBase.h b/src/clients/storage/StorageClientBase.h index c86ee653207..1b2799c943f 100644 --- a/src/clients/storage/StorageClientBase.h +++ b/src/clients/storage/StorageClientBase.h @@ -12,6 +12,7 @@ #include "clients/meta/MetaClient.h" #include "common/base/Base.h" #include "common/base/StatusOr.h" +#include "common/datatypes/HostAddr.h" #include "common/meta/Common.h" #include "common/thrift/ThriftClientManager.h" #include "interface/gen-cpp2/storage_types.h" @@ -144,18 +145,10 @@ class StorageClientBase { class Response = typename std::result_of::type::value_type> folly::Future> getResponse(folly::EventBase* evb, - std::pair&& request, + const HostAddr& host, + const Request& request, RemoteFunc&& remoteFunc); - template ::type::value_type> - void getResponseImpl(folly::EventBase* evb, - std::pair request, - RemoteFunc remoteFunc, - std::shared_ptr>> pro); - // Cluster given ids into the host they belong to // The method returns a map // host_addr (A host, but in most case, the leader will be chosen) diff --git a/src/common/base/StatusOr.h b/src/common/base/StatusOr.h index 3aa166df51d..605f94b567d 100644 --- a/src/common/base/StatusOr.h +++ b/src/common/base/StatusOr.h @@ -6,6 +6,8 @@ #ifndef COMMON_BASE_STATUSOR_H_ #define COMMON_BASE_STATUSOR_H_ +#include + #include "common/base/Base.h" #include "common/base/Status.h" @@ -336,6 +338,21 @@ class StatusOr final { uint8_t state_; }; +namespace internal { +template +struct StatusOrValueType { + using type = T; +}; + +template +struct StatusOrValueType> { + using type = std::remove_cv_t>; +}; +} // namespace internal + +template +using status_or_value_t = typename internal::StatusOrValueType::type; + } // namespace nebula #endif // COMMON_BASE_STATUSOR_H_ diff --git a/src/graph/executor/test/StorageServerStub.cpp b/src/graph/executor/test/StorageServerStub.cpp index 9ace772300d..9d05c0476d5 100644 --- a/src/graph/executor/test/StorageServerStub.cpp +++ b/src/graph/executor/test/StorageServerStub.cpp @@ -8,12 +8,12 @@ #include "common/base/Base.h" #include "storage/GraphStorageLocalServer.h" -#define LOCAL_RETURN_FUTURE(threadManager, respType, callFunc) \ - UNUSED(request); \ - auto promise = std::make_shared>(); \ - respType dummyResp; \ - auto f = promise->getFuture(); \ - promise->setValue(std::move(dummyResp)); \ +#define LOCAL_RETURN_SEMIFUTURE(threadManager, respType, callFunc) \ + UNUSED(request); \ + auto promise = std::make_shared>(); \ + respType dummyResp; \ + auto f = promise->getSemiFuture(); \ + promise->setValue(std::move(dummyResp)); \ return f; namespace nebula::storage { @@ -50,104 +50,104 @@ void GraphStorageLocalServer::stop() { serving_ = false; } -folly::Future GraphStorageLocalServer::future_getNeighbors( +folly::SemiFuture GraphStorageLocalServer::semifuture_getNeighbors( const cpp2::GetNeighborsRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, future_getNeighbors); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::GetNeighborsResponse, semifuture_getNeighbors); } -folly::Future GraphStorageLocalServer::future_addVertices( +folly::SemiFuture GraphStorageLocalServer::semifuture_addVertices( const cpp2::AddVerticesRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_addVertices); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_addVertices); } -folly::Future GraphStorageLocalServer::future_chainAddEdges( +folly::SemiFuture GraphStorageLocalServer::semifuture_chainAddEdges( const cpp2::AddEdgesRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_chainAddEdges); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_chainAddEdges); } -folly::Future GraphStorageLocalServer::future_addEdges( +folly::SemiFuture GraphStorageLocalServer::semifuture_addEdges( const cpp2::AddEdgesRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_addEdges); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_addEdges); } -folly::Future GraphStorageLocalServer::future_getProps( +folly::SemiFuture GraphStorageLocalServer::semifuture_getProps( const cpp2::GetPropRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetPropResponse, future_getProps); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::GetPropResponse, semifuture_getProps); } -folly::Future GraphStorageLocalServer::future_deleteEdges( +folly::SemiFuture GraphStorageLocalServer::semifuture_deleteEdges( const cpp2::DeleteEdgesRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteEdges); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_deleteEdges); } -folly::Future GraphStorageLocalServer::future_chainDeleteEdges( +folly::SemiFuture GraphStorageLocalServer::semifuture_chainDeleteEdges( const cpp2::DeleteEdgesRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_chainDeleteEdges); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_chainDeleteEdges); } -folly::Future GraphStorageLocalServer::future_deleteVertices( +folly::SemiFuture GraphStorageLocalServer::semifuture_deleteVertices( const cpp2::DeleteVerticesRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteVertices); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_deleteVertices); } -folly::Future GraphStorageLocalServer::future_deleteTags( +folly::SemiFuture GraphStorageLocalServer::semifuture_deleteTags( const cpp2::DeleteTagsRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteTags); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_deleteTags); } -folly::Future GraphStorageLocalServer::future_updateVertex( +folly::SemiFuture GraphStorageLocalServer::semifuture_updateVertex( const cpp2::UpdateVertexRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_updateVertex); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::UpdateResponse, semifuture_updateVertex); } -folly::Future GraphStorageLocalServer::future_chainUpdateEdge( +folly::SemiFuture GraphStorageLocalServer::semifuture_chainUpdateEdge( const cpp2::UpdateEdgeRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_chainUpdateEdge); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::UpdateResponse, semifuture_chainUpdateEdge); } -folly::Future GraphStorageLocalServer::future_updateEdge( +folly::SemiFuture GraphStorageLocalServer::semifuture_updateEdge( const cpp2::UpdateEdgeRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_updateEdge); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::UpdateResponse, semifuture_updateEdge); } -folly::Future GraphStorageLocalServer::future_getUUID( +folly::SemiFuture GraphStorageLocalServer::semifuture_getUUID( const cpp2::GetUUIDReq& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetUUIDResp, future_getUUID); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::GetUUIDResp, semifuture_getUUID); } -folly::Future GraphStorageLocalServer::future_lookupIndex( +folly::SemiFuture GraphStorageLocalServer::semifuture_lookupIndex( const cpp2::LookupIndexRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::LookupIndexResp, future_lookupIndex); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::LookupIndexResp, semifuture_lookupIndex); } -folly::Future GraphStorageLocalServer::future_lookupAndTraverse( +folly::SemiFuture GraphStorageLocalServer::semifuture_lookupAndTraverse( const cpp2::LookupAndTraverseRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, future_lookupAndTraverse); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::GetNeighborsResponse, semifuture_lookupAndTraverse); } -folly::Future GraphStorageLocalServer::future_scanVertex( +folly::SemiFuture GraphStorageLocalServer::semifuture_scanVertex( const cpp2::ScanVertexRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ScanResponse, future_scanVertex); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ScanResponse, semifuture_scanVertex); } -folly::Future GraphStorageLocalServer::future_scanEdge( +folly::SemiFuture GraphStorageLocalServer::semifuture_scanEdge( const cpp2::ScanEdgeRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ScanResponse, future_scanEdge); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ScanResponse, semifuture_scanEdge); } -folly::Future GraphStorageLocalServer::future_get( +folly::SemiFuture GraphStorageLocalServer::semifuture_get( const cpp2::KVGetRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::KVGetResponse, future_get); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::KVGetResponse, semifuture_get); } -folly::Future GraphStorageLocalServer::future_put( +folly::SemiFuture GraphStorageLocalServer::semifuture_put( const cpp2::KVPutRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_put); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_put); } -folly::Future GraphStorageLocalServer::future_remove( +folly::SemiFuture GraphStorageLocalServer::semifuture_remove( const cpp2::KVRemoveRequest& request) { - LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_remove); + LOCAL_RETURN_SEMIFUTURE(threadManager_, cpp2::ExecResponse, semifuture_remove); } } // namespace nebula::storage diff --git a/src/storage/GraphStorageLocalServer.cpp b/src/storage/GraphStorageLocalServer.cpp index 697af47f436..d460d324324 100644 --- a/src/storage/GraphStorageLocalServer.cpp +++ b/src/storage/GraphStorageLocalServer.cpp @@ -15,7 +15,7 @@ using folly::exception_wrapper; #define LOCAL_RETURN_FUTURE(RespType, callFunc) \ auto promise = std::make_shared>(); \ - auto f = promise->getFuture(); \ + auto f = promise->getSemiFuture(); \ threadManager_->add([this, promise, request] { \ std::dynamic_pointer_cast(handler_) \ ->callFunc(std::move(request)) \ @@ -58,102 +58,102 @@ void GraphStorageLocalServer::stop() { serving_ = false; } -folly::Future GraphStorageLocalServer::future_getNeighbors( +folly::SemiFuture GraphStorageLocalServer::semifuture_getNeighbors( const cpp2::GetNeighborsRequest& request) { LOCAL_RETURN_FUTURE(cpp2::GetNeighborsResponse, future_getNeighbors); } -folly::Future GraphStorageLocalServer::future_addVertices( +folly::SemiFuture GraphStorageLocalServer::semifuture_addVertices( const cpp2::AddVerticesRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_addVertices); } -folly::Future GraphStorageLocalServer::future_chainAddEdges( +folly::SemiFuture GraphStorageLocalServer::semifuture_chainAddEdges( const cpp2::AddEdgesRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_chainAddEdges); } -folly::Future GraphStorageLocalServer::future_addEdges( +folly::SemiFuture GraphStorageLocalServer::semifuture_addEdges( const cpp2::AddEdgesRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_addEdges); } -folly::Future GraphStorageLocalServer::future_getProps( +folly::SemiFuture GraphStorageLocalServer::semifuture_getProps( const cpp2::GetPropRequest& request) { LOCAL_RETURN_FUTURE(cpp2::GetPropResponse, future_getProps); } -folly::Future GraphStorageLocalServer::future_deleteEdges( +folly::SemiFuture GraphStorageLocalServer::semifuture_deleteEdges( const cpp2::DeleteEdgesRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_deleteEdges); } -folly::Future GraphStorageLocalServer::future_chainDeleteEdges( +folly::SemiFuture GraphStorageLocalServer::semifuture_chainDeleteEdges( const cpp2::DeleteEdgesRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_chainDeleteEdges); } -folly::Future GraphStorageLocalServer::future_deleteVertices( +folly::SemiFuture GraphStorageLocalServer::semifuture_deleteVertices( const cpp2::DeleteVerticesRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_deleteVertices); } -folly::Future GraphStorageLocalServer::future_deleteTags( +folly::SemiFuture GraphStorageLocalServer::semifuture_deleteTags( const cpp2::DeleteTagsRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_deleteTags); } -folly::Future GraphStorageLocalServer::future_updateVertex( +folly::SemiFuture GraphStorageLocalServer::semifuture_updateVertex( const cpp2::UpdateVertexRequest& request) { LOCAL_RETURN_FUTURE(cpp2::UpdateResponse, future_updateVertex); } -folly::Future GraphStorageLocalServer::future_chainUpdateEdge( +folly::SemiFuture GraphStorageLocalServer::semifuture_chainUpdateEdge( const cpp2::UpdateEdgeRequest& request) { LOCAL_RETURN_FUTURE(cpp2::UpdateResponse, future_chainUpdateEdge); } -folly::Future GraphStorageLocalServer::future_updateEdge( +folly::SemiFuture GraphStorageLocalServer::semifuture_updateEdge( const cpp2::UpdateEdgeRequest& request) { LOCAL_RETURN_FUTURE(cpp2::UpdateResponse, future_updateEdge); } -folly::Future GraphStorageLocalServer::future_getUUID( +folly::SemiFuture GraphStorageLocalServer::semifuture_getUUID( const cpp2::GetUUIDReq& request) { LOCAL_RETURN_FUTURE(cpp2::GetUUIDResp, future_getUUID); } -folly::Future GraphStorageLocalServer::future_lookupIndex( +folly::SemiFuture GraphStorageLocalServer::semifuture_lookupIndex( const cpp2::LookupIndexRequest& request) { LOCAL_RETURN_FUTURE(cpp2::LookupIndexResp, future_lookupIndex); } -folly::Future GraphStorageLocalServer::future_lookupAndTraverse( +folly::SemiFuture GraphStorageLocalServer::semifuture_lookupAndTraverse( const cpp2::LookupAndTraverseRequest& request) { LOCAL_RETURN_FUTURE(cpp2::GetNeighborsResponse, future_lookupAndTraverse); } -folly::Future GraphStorageLocalServer::future_scanVertex( +folly::SemiFuture GraphStorageLocalServer::semifuture_scanVertex( const cpp2::ScanVertexRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ScanResponse, future_scanVertex); } -folly::Future GraphStorageLocalServer::future_scanEdge( +folly::SemiFuture GraphStorageLocalServer::semifuture_scanEdge( const cpp2::ScanEdgeRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ScanResponse, future_scanEdge); } -folly::Future GraphStorageLocalServer::future_get( +folly::SemiFuture GraphStorageLocalServer::semifuture_get( const cpp2::KVGetRequest& request) { LOCAL_RETURN_FUTURE(cpp2::KVGetResponse, future_get); } -folly::Future GraphStorageLocalServer::future_put( +folly::SemiFuture GraphStorageLocalServer::semifuture_put( const cpp2::KVPutRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_put); } -folly::Future GraphStorageLocalServer::future_remove( +folly::SemiFuture GraphStorageLocalServer::semifuture_remove( const cpp2::KVRemoveRequest& request) { LOCAL_RETURN_FUTURE(cpp2::ExecResponse, future_remove); } diff --git a/src/storage/GraphStorageLocalServer.h b/src/storage/GraphStorageLocalServer.h index 39d93929058..e2f2810627b 100644 --- a/src/storage/GraphStorageLocalServer.h +++ b/src/storage/GraphStorageLocalServer.h @@ -29,31 +29,39 @@ class GraphStorageLocalServer final : public boost::noncopyable, public nebula:: void serve(); public: - folly::Future future_getNeighbors( + folly::SemiFuture semifuture_getNeighbors( const cpp2::GetNeighborsRequest& request); - folly::Future future_addVertices(const cpp2::AddVerticesRequest& request); - folly::Future future_chainAddEdges(const cpp2::AddEdgesRequest& request); - folly::Future future_addEdges(const cpp2::AddEdgesRequest& request); - folly::Future future_getProps(const cpp2::GetPropRequest& request); - folly::Future future_deleteEdges(const cpp2::DeleteEdgesRequest& request); - folly::Future future_chainDeleteEdges( + folly::SemiFuture semifuture_addVertices( + const cpp2::AddVerticesRequest& request); + folly::SemiFuture semifuture_chainAddEdges( + const cpp2::AddEdgesRequest& request); + folly::SemiFuture semifuture_addEdges(const cpp2::AddEdgesRequest& request); + folly::SemiFuture semifuture_getProps(const cpp2::GetPropRequest& request); + folly::SemiFuture semifuture_deleteEdges( const cpp2::DeleteEdgesRequest& request); - folly::Future future_deleteVertices( + folly::SemiFuture semifuture_chainDeleteEdges( + const cpp2::DeleteEdgesRequest& request); + folly::SemiFuture semifuture_deleteVertices( const cpp2::DeleteVerticesRequest& request); - folly::Future future_deleteTags(const cpp2::DeleteTagsRequest& request); - folly::Future future_updateVertex(const cpp2::UpdateVertexRequest& request); - folly::Future future_chainUpdateEdge( + folly::SemiFuture semifuture_deleteTags( + const cpp2::DeleteTagsRequest& request); + folly::SemiFuture semifuture_updateVertex( + const cpp2::UpdateVertexRequest& request); + folly::SemiFuture semifuture_chainUpdateEdge( + const cpp2::UpdateEdgeRequest& request); + folly::SemiFuture semifuture_updateEdge( const cpp2::UpdateEdgeRequest& request); - folly::Future future_updateEdge(const cpp2::UpdateEdgeRequest& request); - folly::Future future_getUUID(const cpp2::GetUUIDReq& request); - folly::Future future_lookupIndex(const cpp2::LookupIndexRequest& request); - folly::Future future_lookupAndTraverse( + folly::SemiFuture semifuture_getUUID(const cpp2::GetUUIDReq& request); + folly::SemiFuture semifuture_lookupIndex( + const cpp2::LookupIndexRequest& request); + folly::SemiFuture semifuture_lookupAndTraverse( const cpp2::LookupAndTraverseRequest& request); - folly::Future future_scanVertex(const cpp2::ScanVertexRequest& request); - folly::Future future_scanEdge(const cpp2::ScanEdgeRequest& request); - folly::Future future_get(const cpp2::KVGetRequest& request); - folly::Future future_put(const cpp2::KVPutRequest& request); - folly::Future future_remove(const cpp2::KVRemoveRequest& request); + folly::SemiFuture semifuture_scanVertex( + const cpp2::ScanVertexRequest& request); + folly::SemiFuture semifuture_scanEdge(const cpp2::ScanEdgeRequest& request); + folly::SemiFuture semifuture_get(const cpp2::KVGetRequest& request); + folly::SemiFuture semifuture_put(const cpp2::KVPutRequest& request); + folly::SemiFuture semifuture_remove(const cpp2::KVRemoveRequest& request); private: GraphStorageLocalServer() = default; diff --git a/tests/Makefile b/tests/Makefile index 2b5013a0a32..39a73466ca1 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -25,8 +25,8 @@ PASSWORD_LOCK_TIME_IN_SECS ?= 0 # commands gherkin_fmt = ~/.local/bin/reformat-gherkin run_test = PYTHONPATH=$$PYTHONPATH:$(CURR_DIR)/.. $(CURR_DIR)/nebula-test-run.py -test_without_skip = python3 -m pytest -m "not skip" -test_without_skip_sa = python3 -m pytest -m "not skip and not distonly" +test_without_skip = python3 -m pytest -m "not skip" --build_dir=$(BUILD_DIR) +test_without_skip_sa = python3 -m pytest -m "not skip and not distonly" --build_dir=$(BUILD_DIR) test_j = $(test_without_skip) -n$(J) test_j_sa = $(test_without_skip_sa) -n$(J) diff --git a/third-party/install-gcc.sh b/third-party/install-gcc.sh index dcbcd254641..b05d7dd1f59 100755 --- a/third-party/install-gcc.sh +++ b/third-party/install-gcc.sh @@ -4,7 +4,7 @@ # # This source code is licensed under Apache 2.0 License. -# Usage: install-gcc.sh --prefix=/opt/nebula/toolset +# Usage: install-gcc.sh --prefix=/opt # Always use bash shell=$(basename $(readlink /proc/$$/exe))