From f3cf55b76b45c5ca1e7ac75fb1fb10e63d48f0dc Mon Sep 17 00:00:00 2001 From: CPWstatic <13495049+CPWstatic@users.noreply.github.com> Date: Tue, 25 Jan 2022 19:49:04 +0800 Subject: [PATCH] Fix the promise/future. --- src/graph/executor/query/TraverseExecutor.cpp | 24 +++++++++---------- src/graph/executor/query/TraverseExecutor.h | 5 ++-- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 9bb9033d433..17f492a0db6 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -80,11 +80,10 @@ folly::Future TraverseExecutor::traverse() { DataSet emptyResult; return finish(ResultBuilder().value(Value(std::move(emptyResult))).build()); } - getNeighbors(); - return promise_.getFuture(); + return getNeighbors(); } -void TraverseExecutor::getNeighbors() { +folly::Future TraverseExecutor::getNeighbors() { currentStep_++; time::Duration getNbrTime; StorageClient* storageClient = qctx_->getStorageClient(); @@ -93,7 +92,7 @@ void TraverseExecutor::getNeighbors() { qctx()->rctx()->session()->id(), qctx()->plan()->id(), qctx()->plan()->isProfileEnabled()); - storageClient + return storageClient ->getNeighbors(param, reqDs_.colNames, std::move(reqDs_.rows), @@ -112,7 +111,7 @@ void TraverseExecutor::getNeighbors() { .thenValue([this, getNbrTime](StorageRpcResponse&& resp) mutable { SCOPED_TIMER(&execTime_); addStats(resp, getNbrTime.elapsedInUSec()); - handleResponse(resp); + return handleResponse(std::move(resp)); }); } @@ -140,11 +139,11 @@ void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) { otherStats_.emplace(folly::sformat("step {}", currentStep_), ss.str()); } -void TraverseExecutor::handleResponse(RpcResponse& resps) { +folly::Future TraverseExecutor::handleResponse(RpcResponse&& resps) { SCOPED_TIMER(&execTime_); auto result = handleCompleteness(resps, FLAGS_accept_partial_success); if (!result.ok()) { - promise_.setValue(std::move(result).status()); + return folly::makeFuture(std::move(result).status()); } auto& responses = resps.responses(); @@ -162,21 +161,20 @@ void TraverseExecutor::handleResponse(RpcResponse& resps) { auto status = buildInterimPath(iter.get()); if (!status.ok()) { - promise_.setValue(status); - return; + return folly::makeFuture(std::move(status)); } if (!isFinalStep()) { if (reqDs_.rows.empty()) { if (range_ != nullptr) { - promise_.setValue(buildResult()); + return folly::makeFuture(buildResult()); } else { - promise_.setValue(Status::OK()); + return folly::makeFuture(Status::OK()); } } else { - getNeighbors(); + return getNeighbors(); } } else { - promise_.setValue(buildResult()); + return folly::makeFuture(buildResult()); } } diff --git a/src/graph/executor/query/TraverseExecutor.h b/src/graph/executor/query/TraverseExecutor.h index ff4c96f3432..25ec9704ea9 100644 --- a/src/graph/executor/query/TraverseExecutor.h +++ b/src/graph/executor/query/TraverseExecutor.h @@ -41,9 +41,9 @@ class TraverseExecutor final : public StorageAccessExecutor { void addStats(RpcResponse& resps, int64_t getNbrTimeInUSec); - void getNeighbors(); + folly::Future getNeighbors(); - void handleResponse(RpcResponse& resps); + folly::Future handleResponse(RpcResponse&& resps); Status buildInterimPath(GetNeighborsIter* iter); @@ -74,7 +74,6 @@ class TraverseExecutor final : public StorageAccessExecutor { private: DataSet reqDs_; const Traverse* traverse_{nullptr}; - folly::Promise promise_; MatchStepRange* range_{nullptr}; size_t currentStep_{0}; std::list> paths_;