diff --git a/src/graph/context/Iterator.cpp b/src/graph/context/Iterator.cpp index 5b2b94ffac1..b195007d501 100644 --- a/src/graph/context/Iterator.cpp +++ b/src/graph/context/Iterator.cpp @@ -62,7 +62,7 @@ void GetNeighborsIter::goToFirstEdge() { ++currentRow_) { colIdx_ = currentDs_->colLowerBound + 1; while (colIdx_ < currentDs_->colUpperBound && !valid_) { - const auto& currentCol = currentRow_->operator[](colIdx_); + const auto& currentCol = (*currentRow_)[colIdx_]; if (!currentCol.isList() || currentCol.getList().empty()) { ++colIdx_; continue; @@ -229,7 +229,7 @@ void GetNeighborsIter::next() { while (++edgeIdx_ > -1) { if (edgeIdx_ < edgeIdxUpperBound_) { - const auto& currentEdge = currentCol_->operator[](edgeIdx_); + const auto& currentEdge = (*currentCol_)[edgeIdx_]; if (!currentEdge.isList()) { continue; } @@ -366,10 +366,10 @@ const Value& GetNeighborsIter::getTagProp(const std::string& tag, const std::str auto& row = *currentRow_; if (tag == "*") { for (auto& index : currentDs_->tagPropsMap) { - auto propIndex = index.second.propIndices.find(prop); - if (propIndex != index.second.propIndices.end()) { + auto propIndexIter = index.second.propIndices.find(prop); + if (propIndexIter != index.second.propIndices.end()) { colId = index.second.colIdx; - propId = propIndex->second; + propId = propIndexIter->second; DCHECK_GT(row.size(), colId); if (row[colId].empty()) { continue; @@ -393,12 +393,12 @@ const Value& GetNeighborsIter::getTagProp(const std::string& tag, const std::str if (index == tagPropIndices.end()) { return Value::kEmpty; } - auto propIndex = index->second.propIndices.find(prop); - if (propIndex == index->second.propIndices.end()) { + auto propIndexIter = index->second.propIndices.find(prop); + if (propIndexIter == index->second.propIndices.end()) { return Value::kEmpty; } colId = index->second.colIdx; - propId = propIndex->second; + propId = propIndexIter->second; DCHECK_GT(row.size(), colId); if (row[colId].empty()) { return Value::kEmpty; @@ -427,8 +427,7 @@ const Value& GetNeighborsIter::getEdgeProp(const std::string& edge, const std::s } auto index = currentDs_->edgePropsMap.find(currentEdge); if (index == currentDs_->edgePropsMap.end()) { - DLOG(INFO) << "No edge found: " << edge; - DLOG(INFO) << "Current edge: " << currentEdge; + DLOG(INFO) << "No edge found: " << edge << " Current edge: " << currentEdge; return Value::kEmpty; } auto propIndex = index->second.propIndices.find(prop); @@ -466,10 +465,9 @@ Value GetNeighborsIter::getVertex(const std::string& name) { Tag tag; tag.name = tagProp.first; for (size_t i = 0; i < propList.size(); ++i) { - if (tagPropNameList[i] == nebula::kTag) { - continue; + if (tagPropNameList[i] != nebula::kTag) { + tag.props.emplace(tagPropNameList[i], propList[i]); } - tag.props.emplace(tagPropNameList[i], propList[i]); } vertex.tags.emplace_back(std::move(tag)); } diff --git a/src/graph/executor/algo/BatchShortestPath.cpp b/src/graph/executor/algo/BatchShortestPath.cpp index 95ac030019d..1385191c61f 100644 --- a/src/graph/executor/algo/BatchShortestPath.cpp +++ b/src/graph/executor/algo/BatchShortestPath.cpp @@ -9,9 +9,12 @@ #include "sys/sysinfo.h" using nebula::storage::StorageClient; + DECLARE_uint32(num_path_thread); + namespace nebula { namespace graph { + folly::Future BatchShortestPath::execute(const HashSet& startVids, const HashSet& endVids, DataSet* result) { @@ -550,12 +553,13 @@ size_t BatchShortestPath::splitTask(const HashSet& startVids, const HashSet& end ++count; } } - std::stringstream ss; - ss << "{\n" - << "startVids' size : " << startVidsSize << " endVids's size : " << endVidsSize; - ss << " thread num : " << threadNum; - ss << " start blocks : " << startSlices << " end blocks : " << endSlices << "\n}"; - stats_->emplace(folly::sformat("split task "), ss.str()); + folly::dynamic obj = folly::dynamic::object(); + obj.insert("startVids' size", startVidsSize); + obj.insert("endVids's size", endVidsSize); + obj.insert("thread num", threadNum); + obj.insert("start blocks", startSlices); + obj.insert("end blocks", endSlices); + stats_->emplace("split task", folly::toPrettyJson(obj)); return startSlices * endSlices; } diff --git a/src/graph/executor/algo/BatchShortestPath.h b/src/graph/executor/algo/BatchShortestPath.h index 212a90929a5..9c9f50531b0 100644 --- a/src/graph/executor/algo/BatchShortestPath.h +++ b/src/graph/executor/algo/BatchShortestPath.h @@ -9,6 +9,7 @@ namespace nebula { namespace graph { + class BatchShortestPath final : public ShortestPathBase { public: BatchShortestPath(const ShortestPath* node, diff --git a/src/graph/executor/algo/ShortestPathBase.cpp b/src/graph/executor/algo/ShortestPathBase.cpp index e809f27f72b..0d7c7b21246 100644 --- a/src/graph/executor/algo/ShortestPathBase.cpp +++ b/src/graph/executor/algo/ShortestPathBase.cpp @@ -173,13 +173,17 @@ void ShortestPathBase::addStats(RpcResponse& resp, if (result.vertices_ref().has_value()) { size = (*result.vertices_ref()).size(); } - auto info = util::collectRespProfileData(result.result, hostLatency[i], size, timeInUSec); + auto info = util::collectRespProfileData(result.result, hostLatency[i], size); stats.push_back(std::move(info)); } + folly::dynamic stepObj = folly::dynamic::object(); + stepObj.insert("total_rpc_time", folly::sformat("{}(us)", timeInUSec)); + stepObj.insert("storage", stats); + auto key = folly::sformat("{}step[{}]", reverse ? "reverse " : "", stepNum); statsLock_.lock(); - stats_->emplace(key, folly::toPrettyJson(stats)); + stats_->emplace(key, folly::toPrettyJson(stepObj)); statsLock_.unlock(); } @@ -188,12 +192,16 @@ void ShortestPathBase::addStats(PropRpcResponse& resp, int64_t timeInUSec) const auto& hostLatency = resp.hostLatency(); for (size_t i = 0; i < hostLatency.size(); ++i) { const auto& result = resp.responses()[i].get_result(); - auto info = util::collectRespProfileData(result, hostLatency[i], 0, timeInUSec); + auto info = util::collectRespProfileData(result, hostLatency[i], 0); stats.push_back(std::move(info)); } + folly::dynamic propObj = folly::dynamic::object(); + propObj.insert("storage", stats); + propObj.insert("total_rpc_time", folly::sformat("{}(us)", timeInUSec)); + statsLock_.lock(); - stats_->emplace("get_prop", folly::toPrettyJson(stats)); + stats_->emplace("get_prop", folly::toPrettyJson(propObj)); statsLock_.unlock(); } diff --git a/src/graph/executor/algo/ShortestPathBase.h b/src/graph/executor/algo/ShortestPathBase.h index d107f7fbbcd..84d4620375c 100644 --- a/src/graph/executor/algo/ShortestPathBase.h +++ b/src/graph/executor/algo/ShortestPathBase.h @@ -13,8 +13,10 @@ using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetNeighborsResponse; using RpcResponse = StorageRpcResponse; using PropRpcResponse = StorageRpcResponse; + namespace nebula { namespace graph { + class ShortestPathBase { public: using HashSet = robin_hood::unordered_flat_set>; diff --git a/src/graph/executor/algo/ShortestPathExecutor.cpp b/src/graph/executor/algo/ShortestPathExecutor.cpp index 98381a9e72a..371c463d9ae 100644 --- a/src/graph/executor/algo/ShortestPathExecutor.cpp +++ b/src/graph/executor/algo/ShortestPathExecutor.cpp @@ -13,8 +13,10 @@ using nebula::storage::StorageClient; DEFINE_uint32(num_path_thread, 0, "number of concurrent threads when do shortest path"); + namespace nebula { namespace graph { + folly::Future ShortestPathExecutor::execute() { // MemoryTrackerVerified SCOPED_TIMER(&execTime_); diff --git a/src/graph/executor/algo/SingleShortestPath.cpp b/src/graph/executor/algo/SingleShortestPath.cpp index fd509d765f6..ed0d0cce4b6 100644 --- a/src/graph/executor/algo/SingleShortestPath.cpp +++ b/src/graph/executor/algo/SingleShortestPath.cpp @@ -10,6 +10,7 @@ using nebula::storage::StorageClient; namespace nebula { namespace graph { + folly::Future SingleShortestPath::execute(const HashSet& startVids, const HashSet& endVids, DataSet* result) { diff --git a/src/graph/executor/algo/SingleShortestPath.h b/src/graph/executor/algo/SingleShortestPath.h index 10c083028e3..b3944e5f75a 100644 --- a/src/graph/executor/algo/SingleShortestPath.h +++ b/src/graph/executor/algo/SingleShortestPath.h @@ -9,6 +9,7 @@ namespace nebula { namespace graph { + class SingleShortestPath final : public ShortestPathBase { public: using HashSet = robin_hood::unordered_flat_set>; diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 850820bf1fa..e1c28b8c13a 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -27,7 +27,10 @@ folly::Future TraverseExecutor::execute() { DataSet emptyDs; return finish(ResultBuilder().value(Value(std::move(emptyDs))).build()); } - return getNeighbors(); + return getNeighbors().ensure([this]() { + // fill some profile time stats + otherStats_.emplace("expandTime", folly::sformat("{}(us)", expandTime_)); + }); } Status TraverseExecutor::buildRequestVids() { @@ -134,24 +137,24 @@ void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) { if (result.vertices_ref().has_value()) { size = (*result.vertices_ref()).size(); } - auto info = util::collectRespProfileData(result.result, hostLatency[i], size, getNbrTimeInUSec); + auto info = util::collectRespProfileData(result.result, hostLatency[i], size); stepInfo.push_back(std::move(info)); } - otherStats_.emplace(folly::sformat("step[{}]", currentStep_), folly::toPrettyJson(stepInfo)); + folly::dynamic stepObj = folly::dynamic::object(); + stepObj.insert("storage", stepInfo); + stepObj.insert("total_rpc_time", folly::sformat("{}(us)", getNbrTimeInUSec)); + otherStats_.emplace(folly::sformat("step[{}]", currentStep_), folly::toPrettyJson(stepObj)); } folly::Future TraverseExecutor::handleResponse(RpcResponse&& resps) { - auto result = handleCompleteness(resps, FLAGS_accept_partial_success); - if (!result.ok()) { - return folly::makeFuture(std::move(result).status()); - } + NG_RETURN_IF_ERROR(handleCompleteness(resps, FLAGS_accept_partial_success)); + List list; for (auto& resp : resps.responses()) { auto dataset = resp.get_vertices(); - if (dataset == nullptr) { - continue; + if (dataset) { + list.values.emplace_back(std::move(*dataset)); } - list.values.emplace_back(std::move(*dataset)); } auto listVal = std::make_shared(std::move(list)); auto iter = std::make_unique(listVal); @@ -170,19 +173,17 @@ folly::Future TraverseExecutor::handleResponse(RpcResponse&& resps) { result_.rows = buildZeroStepPath(); } } + expand(iter.get()); - if (!isFinalStep()) { - if (vids_.empty()) { - return buildResult(); - } else { - return getNeighbors(); - } - } else { - return buildResult(); + + if (!isFinalStep() && !vids_.empty()) { + return getNeighbors(); } + return buildResult(); } void TraverseExecutor::expand(GetNeighborsIter* iter) { + SCOPED_TIMER(&expandTime_); if (iter->numRows() == 0) { return; } @@ -359,28 +360,9 @@ std::vector TraverseExecutor::buildPath(const Value& initVertex, if (maxStep == 1) { if (traverse_->trackPrevPath()) { - std::vector newResult; - auto dstIter = dst2PathsMap_.find(initVertex); - if (dstIter == dst2PathsMap_.end()) { - return std::vector(); - } - auto& prevPaths = dstIter->second; - for (auto& prevPath : prevPaths) { - for (auto& p : result) { - if (filterSameEdge(prevPath, p)) { - continue; - } - // copy - Row row = prevPath; - row.values.emplace_back(p.values.front()); - row.values.emplace_back(p.values.back()); - newResult.emplace_back(std::move(row)); - } - } - return newResult; - } else { - return result; + return joinPrevPath(initVertex, result); } + return result; } size_t step = 2; @@ -440,85 +422,74 @@ std::vector TraverseExecutor::buildPath(const Value& initVertex, std::make_move_iterator(result.end())); } if (traverse_->trackPrevPath()) { - std::vector newPaths; - auto dstIter = dst2PathsMap_.find(initVertex); - if (dstIter != dst2PathsMap_.end()) { - auto& prevPaths = dstIter->second; - for (auto& prevPath : prevPaths) { - for (auto& p : newResult) { - if (filterSameEdge(prevPath, p)) { - continue; - } - // copy - Row row = prevPath; - row.values.emplace_back(p.values.front()); - row.values.emplace_back(p.values.back()); - newPaths.emplace_back(std::move(row)); - } + return joinPrevPath(initVertex, newResult); + } + return newResult; +} + +std::vector TraverseExecutor::joinPrevPath(const Value& initVertex, + const std::vector& newResult) const { + auto dstIter = dst2PathsMap_.find(initVertex); + if (dstIter == dst2PathsMap_.end()) { + return std::vector(); + } + + std::vector newPaths; + for (auto& prevPath : dstIter->second) { + for (auto& p : newResult) { + if (!hasSameEdgeInPath(prevPath, p)) { + // copy + Row row = prevPath; + row.values.emplace_back(p.values.front()); + row.values.emplace_back(p.values.back()); + newPaths.emplace_back(std::move(row)); } - return newPaths; - } else { - return std::vector(); } - } else { - return newResult; } + return newPaths; } -bool TraverseExecutor::hasSameEdge(const std::vector& edgeList, const Edge& edge) { - for (auto& leftEdge : edgeList) { - if (!leftEdge.isEdge()) { - continue; - } - if (edge.keyEqual(leftEdge.getEdge())) { +bool TraverseExecutor::hasSameEdge(const std::vector& edgeList, const Edge& edge) const { + for (const auto& leftEdge : edgeList) { + if (leftEdge.isEdge() && leftEdge.getEdge().keyEqual(edge)) { return true; } } return false; } -bool TraverseExecutor::filterSameEdge(const Row& lhs, - const Row& rhs, - std::unordered_set* uniqueEdge) { - if (uniqueEdge) { - for (auto& rightListVal : rhs.values) { - if (!rightListVal.isList()) { - continue; +bool TraverseExecutor::hasSameEdgeInPath(const Row& lhs, const Row& rhs) const { + for (const auto& leftListVal : lhs.values) { + if (leftListVal.isList()) { + auto& leftList = leftListVal.getList().values; + for (auto& rightListVal : rhs.values) { + if (rightListVal.isList()) { + auto& rightList = rightListVal.getList().values; + for (auto& edgeVal : rightList) { + if (edgeVal.isEdge() && hasSameEdge(leftList, edgeVal.getEdge())) { + return true; + } + } + } } + } + } + return false; +} + +bool TraverseExecutor::hasSameEdgeInSet(const Row& rhs, + const std::unordered_set& uniqueEdge) const { + for (const auto& rightListVal : rhs.values) { + if (rightListVal.isList()) { auto& rightList = rightListVal.getList().values; for (auto& edgeVal : rightList) { - if (!edgeVal.isEdge()) { - continue; - } - if (uniqueEdge->find(edgeVal) != uniqueEdge->end()) { + if (edgeVal.isEdge() && uniqueEdge.find(edgeVal) != uniqueEdge.end()) { return true; } } } - return false; - } else { - for (auto& leftListVal : lhs.values) { - if (!leftListVal.isList()) { - continue; - } - auto& leftList = leftListVal.getList().values; - for (auto& rightListVal : rhs.values) { - if (!rightListVal.isList()) { - continue; - } - auto& rightList = rightListVal.getList().values; - for (auto& edgeVal : rightList) { - if (!edgeVal.isEdge()) { - continue; - } - if (hasSameEdge(leftList, edgeVal.getEdge())) { - return true; - } - } - } - } - return false; } + return false; } } // namespace graph diff --git a/src/graph/executor/query/TraverseExecutor.h b/src/graph/executor/query/TraverseExecutor.h index ecaa7e67fa2..c54f9877528 100644 --- a/src/graph/executor/query/TraverseExecutor.h +++ b/src/graph/executor/query/TraverseExecutor.h @@ -10,6 +10,7 @@ #include "graph/executor/StorageAccessExecutor.h" #include "graph/planner/plan/Query.h" #include "interface/gen-cpp2/storage_types.h" + // only used in match scenarios // invoke the getNeighbors interface, according to the number of times specified by the user, // and assemble the result into paths @@ -35,6 +36,7 @@ // `getNeighbors` : invoke the getNeightbors interface // `releasePrevPaths` : deleted The path whose length does not meet the user-defined length // `hasSameEdge` : check if there are duplicate edges in path + namespace nebula { namespace graph { @@ -63,18 +65,16 @@ class TraverseExecutor final : public StorageAccessExecutor { folly::Future buildResult(); std::vector buildPath(const Value& initVertex, size_t minStep, size_t maxStep); - folly::Future buildPathMultiJobs(size_t minStep, size_t maxStep); + std::vector joinPrevPath(const Value& initVertex, const std::vector& newResult) const; bool isFinalStep() const { return currentStep_ == range_.max() || range_.max() == 0; } - bool filterSameEdge(const Row& lhs, - const Row& rhs, - std::unordered_set* uniqueEdge = nullptr); - - bool hasSameEdge(const std::vector& edgeList, const Edge& edge); + bool hasSameEdge(const std::vector& edgeList, const Edge& edge) const; + bool hasSameEdgeInPath(const Row& lhs, const Row& rhs) const; + bool hasSameEdgeInSet(const Row& rhs, const std::unordered_set& uniqueEdge) const; std::vector buildZeroStepPath(); @@ -134,6 +134,8 @@ class TraverseExecutor final : public StorageAccessExecutor { const Traverse* traverse_{nullptr}; MatchStepRange range_; size_t currentStep_{0}; + + size_t expandTime_{0u}; }; } // namespace graph diff --git a/src/graph/util/Utils.cpp b/src/graph/util/Utils.cpp index 6442a336bc8..ec69667fdf9 100644 --- a/src/graph/util/Utils.cpp +++ b/src/graph/util/Utils.cpp @@ -18,8 +18,7 @@ folly::dynamic getStorageDetail(const std::map& profileDet folly::dynamic collectRespProfileData(const storage::cpp2::ResponseCommon& resp, const std::tuple& info, - size_t numVertices, - size_t totalRpcTime) { + size_t numVertices) { folly::dynamic stat = folly::dynamic::object(); stat.insert("host", std::get<0>(info).toRawString()); stat.insert("exec", folly::sformat("{}(us)", std::get<1>(info))); @@ -27,9 +26,6 @@ folly::dynamic collectRespProfileData(const storage::cpp2::ResponseCommon& resp, if (numVertices > 0) { stat.insert("vertices", numVertices); } - if (totalRpcTime > 0) { - stat.insert("total_rpc_time", folly::sformat("{}(us)", totalRpcTime)); - } if (resp.latency_detail_us_ref().has_value()) { stat.insert("storage_detail", getStorageDetail(*resp.get_latency_detail_us())); } diff --git a/src/graph/util/Utils.h b/src/graph/util/Utils.h index c123212bf4f..6d0829f4343 100644 --- a/src/graph/util/Utils.h +++ b/src/graph/util/Utils.h @@ -35,8 +35,7 @@ folly::dynamic getStorageDetail(const std::map& profileDet folly::dynamic collectRespProfileData(const storage::cpp2::ResponseCommon& resp, const std::tuple& info, - size_t numVertices = 0UL, - size_t totalRpcTime = 0UL); + size_t numVertices = 0UL); } // namespace nebula::graph::util