diff --git a/src/clients/storage/InternalStorageClient.cpp b/src/clients/storage/InternalStorageClient.cpp index ce1f4be8b1c..f7e9457bfe7 100644 --- a/src/clients/storage/InternalStorageClient.cpp +++ b/src/clients/storage/InternalStorageClient.cpp @@ -10,6 +10,8 @@ namespace nebula { namespace storage { +constexpr int32_t kInternalPortOffset = -2; + template ::nebula::cpp2::ErrorCode getErrorCode(T& tryResp) { if (!tryResp.hasValue()) { diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index c9b530595a5..0cb5b32d377 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -106,20 +106,20 @@ StorageClientBase::collectResponse( memory::MemoryCheckGuard guard; StorageRpcResponse rpcResp(resps.size()); for (size_t i = 0; i < resps.size(); i++) { - auto& host = hosts->at(i); + const auto& host = hosts->at(i); folly::Try>& tryResp = resps[i]; if (tryResp.hasException()) { std::string errMsg = tryResp.exception().what().toStdString(); rpcResp.markFailure(); LOG(ERROR) << "There some RPC errors: " << errMsg; - auto req = requests.at(host); - auto parts = getReqPartsId(req); + const auto& req = requests.at(host); + const auto& parts = getReqPartsId(req); rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE); } else { StatusOr status = std::move(tryResp).value(); if (status.ok()) { auto resp = std::move(status).value(); - auto result = resp.get_result(); + const auto& result = resp.get_result(); if (!result.get_failed_parts().empty()) { rpcResp.markFailure(); @@ -141,8 +141,8 @@ StorageClientBase::collectResponse( ? nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED : nebula::cpp2::ErrorCode::E_RPC_FAILURE; LOG(ERROR) << "There some RPC errors: " << s.message(); - auto req = requests.at(host); - auto parts = getReqPartsId(req); + const auto& req = requests.at(host); + const auto& parts = getReqPartsId(req); rpcResp.appendFailedParts(parts, errorCode); } } diff --git a/src/clients/storage/StorageClientBase.h b/src/clients/storage/StorageClientBase.h index b120dec009e..4be52412e61 100644 --- a/src/clients/storage/StorageClientBase.h +++ b/src/clients/storage/StorageClientBase.h @@ -20,8 +20,6 @@ DECLARE_int32(storage_client_timeout_ms); DECLARE_uint32(storage_client_retry_interval_ms); -constexpr int32_t kInternalPortOffset = -2; - namespace nebula { namespace storage { @@ -34,21 +32,18 @@ class StorageRpcResponse final { }; explicit StorageRpcResponse(size_t reqsSent) : totalReqsSent_(reqsSent) { - lock_ = std::make_unique(); + responses_.reserve(reqsSent); } bool succeeded() const { - std::lock_guard g(*lock_); return result_ == Result::ALL_SUCCEEDED; } int32_t maxLatency() const { - std::lock_guard g(*lock_); return maxLatency_; } - void setLatency(HostAddr host, int32_t latency, int32_t e2eLatency) { - std::lock_guard g(*lock_); + void setLatency(const HostAddr& host, int32_t latency, int32_t e2eLatency) { if (latency > maxLatency_) { maxLatency_ = latency; } @@ -56,26 +51,22 @@ class StorageRpcResponse final { } void markFailure() { - std::lock_guard g(*lock_); result_ = Result::PARTIAL_SUCCEEDED; ++failedReqs_; } // A value between [0, 100], representing a percentage int32_t completeness() const { - std::lock_guard g(*lock_); DCHECK_NE(totalReqsSent_, 0); return totalReqsSent_ == 0 ? 0 : (totalReqsSent_ - failedReqs_) * 100 / totalReqsSent_; } void emplaceFailedPart(PartitionID partId, nebula::cpp2::ErrorCode errorCode) { - std::lock_guard g(*lock_); failedParts_.emplace(partId, errorCode); } void appendFailedParts(const std::vector& partsId, nebula::cpp2::ErrorCode errorCode) { - std::lock_guard g(*lock_); failedParts_.reserve(failedParts_.size() + partsId.size()); for (const auto& partId : partsId) { failedParts_.emplace(partId, errorCode); @@ -83,32 +74,26 @@ class StorageRpcResponse final { } void addResponse(Response&& resp) { - std::lock_guard g(*lock_); responses_.emplace_back(std::move(resp)); } - // Not thread-safe. const std::unordered_map& failedParts() const { return failedParts_; } - // Not thread-safe. std::vector& responses() { return responses_; } - // Not thread-safe. const std::vector& responses() const { return responses_; } - // Not thread-safe. const std::vector>& hostLatency() const { return hostLatency_; } private: - std::unique_ptr lock_; const size_t totalReqsSent_; size_t failedReqs_{0}; diff --git a/src/graph/context/iterator/PropIter.cpp b/src/graph/context/iterator/PropIter.cpp index 5daa5807ee4..286cc1fe1bd 100644 --- a/src/graph/context/iterator/PropIter.cpp +++ b/src/graph/context/iterator/PropIter.cpp @@ -93,16 +93,13 @@ const Value& PropIter::getProp(const std::string& name, const std::string& prop) if (name == "*") { for (auto& index : propsMap) { auto propIndex = index.second.find(prop); - if (propIndex == index.second.end()) { - continue; - } - colId = propIndex->second; - DCHECK_GT(row.size(), colId); - auto& val = row[colId]; - if (val.empty()) { - continue; - } else { - return val; + if (propIndex != index.second.end()) { + colId = propIndex->second; + DCHECK_GT(row.size(), colId); + auto& val = row[colId]; + if (!val.empty()) { + return val; + } } } return Value::kNullValue; @@ -153,9 +150,7 @@ Value PropIter::getVertex(const std::string& name) { Tag tag; tag.name = tagProp.first; for (auto& propIndex : tagProp.second) { - if (propIndex.first == nebula::kTag) { // "_tag" - continue; - } else { + if (propIndex.first != nebula::kTag) { // "_tag" tag.props.emplace(propIndex.first, row[propIndex.second]); } } diff --git a/src/graph/executor/Executor.h b/src/graph/executor/Executor.h index ba9f90970cd..d4e27a8ad69 100644 --- a/src/graph/executor/Executor.h +++ b/src/graph/executor/Executor.h @@ -123,6 +123,13 @@ class Executor : private boost::noncopyable, private cpp::NonMovable { class GatherFunc> auto runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator *iter); + void addState(const std::string &name, size_t durationInUs) { + otherStats_.emplace(name, folly::sformat("{}(us)", durationInUs)); + } + void addState(const std::string &name, folly::dynamic json) { + otherStats_.emplace(name, folly::toPrettyJson(json)); + } + int64_t id_; // Executor name diff --git a/src/graph/executor/StorageAccessExecutor.h b/src/graph/executor/StorageAccessExecutor.h index a3881325f7c..c8c974435c6 100644 --- a/src/graph/executor/StorageAccessExecutor.h +++ b/src/graph/executor/StorageAccessExecutor.h @@ -148,12 +148,11 @@ class StorageAccessExecutor : public Executor { } template - void addStats(storage::StorageRpcResponse &resp, - std::unordered_map &stats) const { + void addStats(storage::StorageRpcResponse &resp) { auto &hostLatency = resp.hostLatency(); for (size_t i = 0; i < hostLatency.size(); ++i) { auto info = util::collectRespProfileData(resp.responses()[i].get_result(), hostLatency[i]); - stats.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info)); + addState(folly::sformat("resp[{}]", i), std::move(info)); } } diff --git a/src/graph/executor/query/AppendVerticesExecutor.cpp b/src/graph/executor/query/AppendVerticesExecutor.cpp index ae420b28ad4..73dfb3e074a 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.cpp +++ b/src/graph/executor/query/AppendVerticesExecutor.cpp @@ -9,9 +9,12 @@ using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetPropResponse; + DECLARE_bool(optimize_appendvertices); + namespace nebula { namespace graph { + folly::Future AppendVerticesExecutor::execute() { return appendVertices(); } @@ -56,15 +59,12 @@ folly::Future AppendVerticesExecutor::appendVertices() { av->limit(qctx()), av->filter()) .via(runner()) - .ensure([this, getPropsTime]() { - SCOPED_TIMER(&execTime_); - otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec())); - }) - .thenValue([this](StorageRpcResponse &&rpcResp) { + .thenValue([this, getPropsTime](StorageRpcResponse &&rpcResp) { // MemoryTrackerVerified memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); - addStats(rpcResp, otherStats_); + addState("total_rpc", getPropsTime.elapsedInUSec()); + addStats(rpcResp); if (FLAGS_max_job_size <= 1) { return folly::makeFuture(handleResp(std::move(rpcResp))); } else { @@ -150,12 +150,11 @@ Status AppendVerticesExecutor::handleResp( bool mv = movable(av->inputVars().front()); for (; inputIter->valid(); inputIter->next()) { auto dstFound = map.find(src->eval(ctx(inputIter.get()))); - if (dstFound == map.end()) { - continue; + if (dstFound != map.end()) { + Row row = mv ? inputIter->moveRow() : *inputIter->row(); + row.values.emplace_back(dstFound->second); + ds.rows.emplace_back(std::move(row)); } - Row row = mv ? inputIter->moveRow() : *inputIter->row(); - row.values.emplace_back(dstFound->second); - ds.rows.emplace_back(std::move(row)); } return finish(ResultBuilder().value(Value(std::move(ds))).state(state).build()); } @@ -175,9 +174,7 @@ folly::Future AppendVerticesExecutor::handleRespMultiJobs( if (resp.props_ref().has_value()) { auto &&respV = std::move(*resp.props_ref()); v.colNames = respV.colNames; - v.rows.insert(v.rows.end(), - std::make_move_iterator(respV.begin()), - std::make_move_iterator(respV.end())); + std::move(respV.begin(), respV.end(), std::back_inserter(v.rows)); } } auto propIter = PropIter(std::make_shared(std::move(v))); @@ -191,10 +188,9 @@ folly::Future AppendVerticesExecutor::handleRespMultiJobs( auto gather = [this](auto &&results) -> Status { memory::MemoryCheckGuard guard; for (auto &r : results) { + NG_RETURN_IF_ERROR(r); auto &&rows = std::move(r).value(); - result_.rows.insert(result_.rows.end(), - std::make_move_iterator(rows.begin()), - std::make_move_iterator(rows.end())); + std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows)); } return finish(ResultBuilder().value(Value(std::move(result_))).build()); }; @@ -219,10 +215,9 @@ folly::Future AppendVerticesExecutor::handleRespMultiJobs( auto gatherFinal = [this](auto &&results) -> Status { memory::MemoryCheckGuard guard2; for (auto &r : results) { + NG_RETURN_IF_ERROR(r); auto &&rows = std::move(r).value(); - result_.rows.insert(result_.rows.end(), - std::make_move_iterator(rows.begin()), - std::make_move_iterator(rows.end())); + std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows)); } return finish(ResultBuilder().value(Value(std::move(result_))).build()); }; @@ -280,12 +275,11 @@ DataSet AppendVerticesExecutor::handleJob(size_t begin, size_t end, Iterator *it QueryExpressionContext ctx(qctx()->ectx()); for (; iter->valid() && begin++ < end; iter->next()) { auto dstFound = dsts_.find(src->eval(ctx(iter))); - if (dstFound == dsts_.end()) { - continue; + if (dstFound != dsts_.end()) { + Row row = *iter->row(); + row.values.emplace_back(dstFound->second); + ds.rows.emplace_back(std::move(row)); } - Row row = *iter->row(); - row.values.emplace_back(dstFound->second); - ds.rows.emplace_back(std::move(row)); } return ds; diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index eade60d9e2b..f7fd043a2b5 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -103,7 +103,7 @@ folly::Future GetEdgesExecutor::getEdges() { .thenValue([this, ge](StorageRpcResponse &&rpcResp) { memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); - addStats(rpcResp, otherStats_); + addStats(rpcResp); return handleResp(std::move(rpcResp), ge->colNames()); }); } diff --git a/src/graph/executor/query/GetVerticesExecutor.cpp b/src/graph/executor/query/GetVerticesExecutor.cpp index 0d250044d9a..27f16de84ea 100644 --- a/src/graph/executor/query/GetVerticesExecutor.cpp +++ b/src/graph/executor/query/GetVerticesExecutor.cpp @@ -53,7 +53,7 @@ folly::Future GetVerticesExecutor::getVertices() { .thenValue([this, gv](StorageRpcResponse &&rpcResp) { memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); - addStats(rpcResp, otherStats_); + addStats(rpcResp); return handleResp(std::move(rpcResp), gv->colNames()); }); } diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index 8bbfa8addd5..4341a69a49c 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -44,7 +44,7 @@ folly::Future IndexScanExecutor::indexScan() { .thenValue([this](StorageRpcResponse &&rpcResp) { // MemoryTrackerVerified memory::MemoryCheckGuard guard; - addStats(rpcResp, otherStats_); + addStats(rpcResp); return handleResp(std::move(rpcResp)); }); } diff --git a/src/graph/executor/query/ScanEdgesExecutor.cpp b/src/graph/executor/query/ScanEdgesExecutor.cpp index 769567620b2..7ff36758906 100644 --- a/src/graph/executor/query/ScanEdgesExecutor.cpp +++ b/src/graph/executor/query/ScanEdgesExecutor.cpp @@ -43,7 +43,7 @@ folly::Future ScanEdgesExecutor::scanEdges() { .thenValue([this](StorageRpcResponse &&rpcResp) { memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); - addStats(rpcResp, otherStats_); + addStats(rpcResp); return handleResp(std::move(rpcResp), {}); }); } diff --git a/src/graph/executor/query/ScanVerticesExecutor.cpp b/src/graph/executor/query/ScanVerticesExecutor.cpp index ba15ced7412..130ecf4712b 100644 --- a/src/graph/executor/query/ScanVerticesExecutor.cpp +++ b/src/graph/executor/query/ScanVerticesExecutor.cpp @@ -44,7 +44,7 @@ folly::Future ScanVerticesExecutor::scanVertices() { .thenValue([this, sv](StorageRpcResponse &&rpcResp) { memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); - addStats(rpcResp, otherStats_); + addStats(rpcResp); return handleResp(std::move(rpcResp), sv->colNames()); }); } diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 74fc8298768..3c34b8c6255 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -107,7 +107,7 @@ folly::Future TraverseExecutor::getNeighbors() { addStats(resp, getNbrTime.elapsedInUSec()); time::Duration expandTime; return handleResponse(std::move(resp)).ensure([this, expandTime]() { - otherStats_.emplace("expandTime", folly::sformat("{}(us)", expandTime.elapsedInUSec())); + addState("expandTime", expandTime.elapsedInUSec()); }); }) .thenValue([this](Status s) -> folly::Future { @@ -149,7 +149,7 @@ void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) { folly::dynamic stepObj = folly::dynamic::object(); stepObj.insert("storage", stepInfo); stepObj.insert("total_rpc_time", folly::sformat("{}(us)", getNbrTimeInUSec)); - otherStats_.emplace(folly::sformat("step[{}]", currentStep_), folly::toPrettyJson(stepObj)); + addState(folly::sformat("step[{}]", currentStep_), std::move(stepObj)); } size_t TraverseExecutor::numRowsOfRpcResp(const RpcResponse& resps) const { @@ -240,13 +240,12 @@ folly::Future TraverseExecutor::asyncExpandOneStep(RpcResponse&& resps) } } - auto t = postTaskTime.elapsedInUSec(); - otherStats_.emplace("expandPostTaskTime", folly::sformat("{}(us)", t)); + addState("expandPostTaskTime", postTaskTime.elapsedInUSec()); folly::dynamic taskRunTimeArray = folly::dynamic::array(); for (auto time : *taskRunTime) { taskRunTimeArray.push_back(time); } - otherStats_.emplace("expandTaskRunTime", folly::toPrettyJson(taskRunTimeArray)); + addState("expandTaskRunTime", std::move(taskRunTimeArray)); return Status::OK(); });