diff --git a/src/graph/executor/StorageAccessExecutor.cpp b/src/graph/executor/StorageAccessExecutor.cpp index a1d691dc3ea..85061bc57c7 100644 --- a/src/graph/executor/StorageAccessExecutor.cpp +++ b/src/graph/executor/StorageAccessExecutor.cpp @@ -225,9 +225,8 @@ void StorageAccessExecutor::addGetNeighborStats(RpcResponse &resp, size_t stepNu } auto key = folly::sformat("{}step[{}]", reverse ? "reverse " : "", stepNum); - statsLock_.lock(); + std::lock_guard lk(statsLock_); otherStats_.emplace(key, folly::toPrettyJson(stats)); - statsLock_.unlock(); } } // namespace graph diff --git a/src/graph/executor/algo/AllPathsExecutor.cpp b/src/graph/executor/algo/AllPathsExecutor.cpp index ebbabdfef3f..f70c07ecc93 100644 --- a/src/graph/executor/algo/AllPathsExecutor.cpp +++ b/src/graph/executor/algo/AllPathsExecutor.cpp @@ -158,72 +158,22 @@ folly::Future AllPathsExecutor::getNeighbors(bool reverse) { auto iter = std::make_unique(listVal); time::Duration buildAdjTime; auto key = folly::sformat("buildAdjTime {}step[{}]", reverse ? "reverse " : "", step); - if (reverse) { - rightNextStepVids_.clear(); - expandFromRight(iter.get()); - } else { - leftNextStepVids_.clear(); - expandFromLeft(iter.get()); + expand(iter.get(), reverse); + { + std::lock_guard lk(statsLock_); + otherStats_.emplace(key, folly::sformat("{}(us)", buildAdjTime.elapsedInUSec())); } - otherStats_.emplace(key, folly::sformat("{}(us)", buildAdjTime.elapsedInUSec())); return Status::OK(); }); } -void AllPathsExecutor::expandFromRight(GetNeighborsIter* iter) { - if (iter->numRows() == 0) { - return; - } - auto* stepFilter = pathNode_->stepFilter(); - QueryExpressionContext ctx(ectx_); - - std::unordered_set uniqueVids; - Value curVertex; - for (; iter->valid(); iter->next()) { - if (stepFilter != nullptr) { - const auto& stepFilterVal = stepFilter->eval(ctx(iter)); - if (!stepFilterVal.isBool() || !stepFilterVal.getBool()) { - continue; - } - } - const auto& edgeVal = iter->getEdge(); - if (edgeVal.empty()) { - continue; - } - auto edge = edgeVal.getEdge(); - edge.reverse(); - const auto& src = edge.src; - auto srcIter = rightAdjList_.find(src); - if (srcIter == rightAdjList_.end()) { - if (uniqueVids.emplace(src).second && rightInitVids_.find(src) == rightInitVids_.end()) { - rightNextStepVids_.emplace_back(src); - } - std::vector adjEdges({edge}); - rightAdjList_.emplace(src, std::move(adjEdges)); - } else { - srcIter->second.emplace_back(edge); - } - const auto& vertex = iter->getVertex(); - if (curVertex != vertex) { - curVertex = vertex; - if (rightSteps_ == 1) { - // delete item equal to vertex.vid - rightInitVids_.erase(vertex); - // add vertex to table - rightInitVids_.emplace(vertex); - } - auto dstIter = rightAdjList_.find(vertex); - if (dstIter != rightAdjList_.end()) { - rightAdjList_[vertex] = dstIter->second; - } - } - } -} - -void AllPathsExecutor::expandFromLeft(GetNeighborsIter* iter) { +void AllPathsExecutor::expand(GetNeighborsIter* iter, bool reverse) { if (iter->numRows() == 0) { return; } + auto& adjList = reverse ? rightAdjList_ : leftAdjList_; + auto& nextStepVids = reverse ? rightNextStepVids_ : leftNextStepVids_; + nextStepVids.clear(); auto* stepFilter = pathNode_->stepFilter(); QueryExpressionContext ctx(ectx_); @@ -242,61 +192,23 @@ void AllPathsExecutor::expandFromLeft(GetNeighborsIter* iter) { continue; } const auto& dst = edge.getEdge().dst; - if (leftAdjList_.find(dst) == leftAdjList_.end() && uniqueVids.emplace(dst).second) { - leftNextStepVids_.emplace_back(dst); + if (adjList.find(dst) == adjList.end() && uniqueVids.emplace(dst).second) { + nextStepVids.emplace_back(dst); } const auto& vertex = iter->getVertex(); curVertex = curVertex.empty() ? vertex : curVertex; if (curVertex != vertex) { - leftAdjList_.emplace(curVertex, std::move(adjEdges)); + adjList.emplace(curVertex, std::move(adjEdges)); curVertex = vertex; } adjEdges.emplace_back(edge); } if (!curVertex.empty()) { - leftAdjList_.emplace(curVertex, std::move(adjEdges)); + adjList.emplace(curVertex, std::move(adjEdges)); } } folly::Future AllPathsExecutor::buildResult() { - // when the key in the right adjacency list does not exist in the left adjacency list - // add key & values to the left adjacency list, - // if key exists, discard the right adjacency's key & values - // because the right adjacency list may have fewer edges - // a->c->o, a->b, c->f, f->o - time::Duration mergeAdjTime; - for (auto& rAdj : rightAdjList_) { - auto& src = rAdj.first; - auto iter = leftAdjList_.find(src); - if (iter == leftAdjList_.end()) { - if (!src.isVertex()) { - Value val(Vertex(src, {})); - leftAdjList_.emplace(val, std::move(rAdj.second)); - emptyPropVids_.emplace_back(src); - } else { - leftAdjList_.emplace(src, std::move(rAdj.second)); - } - } - } - if (rightSteps_ == 0) { - std::unordered_set rightVids; - rightVids.reserve(rightInitVids_.size()); - for (auto& vid : rightInitVids_) { - Value val = Vertex(vid, {}); - rightVids.emplace(val); - emptyPropVids_.emplace_back(vid); - } - rightInitVids_.swap(rightVids); - } - if (leftSteps_ == 0) { - for (auto& vid : leftInitVids_) { - auto iter = leftAdjList_.find(vid); - if (iter != leftAdjList_.end()) { - emptyPropVids_.emplace_back(vid); - } - } - } - otherStats_.emplace("merge_adj_time", folly::sformat("{}(us)", mergeAdjTime.elapsedInUSec())); time::Duration buildPathTime; auto future = buildPathMultiJobs(); return future.via(runner()) @@ -315,103 +227,84 @@ folly::Future AllPathsExecutor::buildResult() { } folly::Future AllPathsExecutor::buildPathMultiJobs() { - auto pathsPtr = std::make_shared>(); - if (threadLocalPtr_.get() == nullptr) { - threadLocalPtr_.reset(new std::deque()); - } - for (auto& vid : leftInitVids_) { - auto vidIter = leftAdjList_.find(vid); - if (vidIter == leftAdjList_.end()) { - continue; - } - auto& src = vidIter->first; - auto& adjEdges = vidIter->second; - if (adjEdges.empty()) { - continue; - } - pathsPtr->reserve(adjEdges.size() + pathsPtr->size()); - for (auto& edge : adjEdges) { - threadLocalPtr_->emplace_back(NPath(src, edge)); - pathsPtr->emplace_back(&threadLocalPtr_->back()); - } - } - size_t step = 2; - auto future = doBuildPath(step, 0, pathsPtr->size(), pathsPtr); - return future.via(runner()).thenValue([this](std::vector>&& paths) { - memory::MemoryCheckGuard guard; + std::vector>> futures; + futures.emplace_back( + folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, false); })); + futures.emplace_back( + folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, true); })); - if (!paths.empty()) { - time::Duration convertPathTime; - if (paths.size() > limit_) { - paths.resize(limit_); - } - for (auto& path : paths) { - result_.rows.emplace_back(convertNPath2Row(path.first, path.second)); - } - otherStats_.emplace("convert_path_time", - folly::sformat("{}(us)", convertPathTime.elapsedInUSec())); - } - return Status::OK(); - }); -} + time::Duration conjunctPathTime; + return folly::collect(futures) + .via(runner()) + .thenValue([this](std::vector>&& paths) { + memory::MemoryCheckGuard guard; + auto& leftPaths = paths.front(); + auto& rightPaths = paths.back(); -// construct ROW[src1, [e1, v2, e2], v3] -Row AllPathsExecutor::convertNPath2Row(NPath* path, Value dst) { - std::vector list; - NPath* head = path; - while (head != nullptr) { - list.emplace_back(head->edge); - list.emplace_back(head->vertex); - head = head->p; - } - Row row; - // add src; - row.values.emplace_back(list.back()); - list.pop_back(); - std::reverse(list.begin(), list.end()); - List edgeList(std::move(list)); - row.values.emplace_back(std::move(edgeList)); - row.values.emplace_back(std::move(dst)); - return row; + if (leftSteps_ == 0) { + buildOneWayPath(rightPaths, false); + return folly::makeFuture(Status::OK()); + } + if (rightSteps_ == 0) { + buildOneWayPath(leftPaths, true); + return folly::makeFuture(Status::OK()); + } + + return conjunctPath(leftPaths, rightPaths); + }) + .thenValue([this, conjunctPathTime](auto&& resps) { + NG_RETURN_IF_ERROR(resps); + otherStats_.emplace("conjunct_path_time", + folly::sformat("{}(us)", conjunctPathTime.elapsedInUSec())); + return Status::OK(); + }); } -folly::Future>> -AllPathsExecutor::doBuildPath(size_t step, - size_t start, - size_t end, - std::shared_ptr> pathsPtr) { - if (cnt_.load(std::memory_order_relaxed) >= limit_) { - return folly::makeFuture>>( - std::vector>()); +folly::Future> AllPathsExecutor::doBuildPath( + size_t step, + size_t start, + size_t end, + std::shared_ptr> pathsPtr, + bool reverse) { + auto maxStep = reverse ? rightSteps_ : leftSteps_; + if (step > maxStep) { + return folly::makeFuture>(std::vector()); } if (threadLocalPtr_.get() == nullptr) { threadLocalPtr_.reset(new std::deque()); } - auto& adjList = leftAdjList_; - auto currentStepResult = std::make_unique>>(); + auto& adjList = reverse ? rightAdjList_ : leftAdjList_; auto newPathsPtr = std::make_shared>(); - for (auto i = start; i < end; ++i) { - auto path = (*pathsPtr)[i]; - auto& edgeValue = path->edge; - DCHECK(edgeValue.isEdge()); - auto& dst = edgeValue.getEdge().dst; - auto dstIter = rightInitVids_.find(dst); - if (dstIter != rightInitVids_.end()) { - currentStepResult->emplace_back(std::make_pair(path, *dstIter)); - ++cnt_; - if (cnt_.load(std::memory_order_relaxed) >= limit_) { - break; + if (step == 1) { + auto& initVids = reverse ? rightInitVids_ : leftInitVids_; + for (auto& vid : initVids) { + auto vidIter = adjList.find(vid); + if (vidIter == adjList.end()) { + continue; + } + auto& src = vidIter->first; + auto& adjEdges = vidIter->second; + if (adjEdges.empty()) { + continue; + } + for (auto& edge : adjEdges) { + threadLocalPtr_->emplace_back(NPath(src, edge)); + newPathsPtr->emplace_back(&threadLocalPtr_->back()); } } - if (step <= maxStep_) { + } else { + for (auto i = start; i < end; ++i) { + auto path = (*pathsPtr)[i]; + auto& edgeValue = path->edge; + DCHECK(edgeValue.isEdge()); + auto& dst = edgeValue.getEdge().dst; auto adjIter = adjList.find(dst); if (adjIter == adjList.end()) { continue; } - - auto& adjedges = adjIter->second; - for (auto& edge : adjedges) { + auto& adjEdges = adjIter->second; + for (auto& edge : adjEdges) { if (noLoop_) { if (hasSameVertices(path, edge.getEdge())) { continue; @@ -428,28 +321,27 @@ AllPathsExecutor::doBuildPath(size_t step, } auto newPathsSize = newPathsPtr->size(); - if (step > maxStep_ || newPathsSize == 0) { - return folly::makeFuture>>(std::move(*currentStepResult)); + if (newPathsSize == 0) { + return folly::makeFuture>(std::vector()); } - std::vector>>> futures; + std::vector>> futures; if (newPathsSize < FLAGS_path_batch_size) { - futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr]() { - return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr); + futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr, reverse]() { + return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr, reverse); })); } else { for (size_t _start = 0; _start < newPathsSize; _start += FLAGS_path_batch_size) { auto tmp = _start + FLAGS_path_batch_size; auto _end = tmp > newPathsSize ? newPathsSize : tmp; - futures.emplace_back(folly::via(runner(), [this, step, _start, _end, newPathsPtr]() { - return doBuildPath(step + 1, _start, _end, newPathsPtr); + futures.emplace_back(folly::via(runner(), [this, step, _start, _end, newPathsPtr, reverse]() { + return doBuildPath(step + 1, _start, _end, newPathsPtr, reverse); })); } } return folly::collect(futures).via(runner()).thenValue( - [pathPtr = std::move(currentStepResult)]( - std::vector>>&& paths) { + [currentStepResult = newPathsPtr](std::vector>&& paths) { memory::MemoryCheckGuard guard; - std::vector> result = std::move(*pathPtr); + std::vector result = std::move(*currentStepResult); for (auto& path : paths) { if (path.empty()) { continue; @@ -462,6 +354,241 @@ AllPathsExecutor::doBuildPath(size_t step, }); } +std::vector AllPathsExecutor::convertNPath2List(NPath* path, bool reverse) { + std::vector list; + NPath* head = path; + while (head != nullptr) { + list.emplace_back(head->edge); + list.emplace_back(head->vertex); + head = head->p; + } + if (!reverse) { + std::reverse(list.begin(), list.end()); + } + return list; +} + +void AllPathsExecutor::buildOneWayPath(std::vector& paths, bool reverse) { + auto& initVids = reverse ? rightInitVids_ : leftInitVids_; + for (auto& path : paths) { + auto& edgeVal = path->edge; + auto& dst = edgeVal.getEdge().dst; + auto findDst = initVids.find(dst); + if (findDst == initVids.end()) { + continue; + } + cnt_.fetch_add(1, std::memory_order_relaxed); + if (cnt_.load(std::memory_order_relaxed) > limit_) { + break; + } + auto valueList = convertNPath2List(path, !reverse); + Row row; + Value emptyPropVertex(Vertex(dst, {})); + if (!reverse) { + auto dstVertex = valueList.back(); + valueList.pop_back(); + // add src + row.values.emplace_back(emptyPropVertex); + List edgeList(std::move(valueList)); + // add edgeList + row.values.emplace_back(std::move(edgeList)); + // add dst + row.values.emplace_back(dstVertex); + } else { + // add src + row.values.emplace_back(valueList.front()); + std::vector tmp(std::make_move_iterator(valueList.begin() + 1), + std::make_move_iterator(valueList.end())); + List edgeList(std::move(tmp)); + // add edgeList + row.values.emplace_back(std::move(edgeList)); + // add dst + row.values.emplace_back(emptyPropVertex); + } + auto iter = emptyPropVertices_.find(emptyPropVertex); + if (iter == emptyPropVertices_.end()) { + emptyPropVids_.emplace_back(dst); + emptyPropVertices_.emplace(emptyPropVertex); + } + result_.rows.emplace_back(std::move(row)); + } +} + +std::vector AllPathsExecutor::buildOneWayPathFromHashTable(bool reverse) { + auto& initVids = reverse ? rightInitVids_ : leftInitVids_; + std::vector result; + for (auto& vid : initVids) { + auto findVid = hashTable_.find(vid); + if (findVid == hashTable_.end()) { + continue; + } + auto& paths = findVid->second; + Value emptyPropVertex(Vertex(vid, {})); + if (!reverse) { + for (auto& path : paths) { + cnt_.fetch_add(1, std::memory_order_relaxed); + if (cnt_.load(std::memory_order_relaxed) > limit_) { + break; + } + Row row; + row.values.emplace_back(emptyPropVertex); + auto& dstVertex = path.back(); + std::vector tmp(path.begin(), path.end() - 1); + List edgeList(std::move(tmp)); + row.values.emplace_back(std::move(edgeList)); + row.values.emplace_back(dstVertex); + result.emplace_back(std::move(row)); + } + } else { + for (auto& path : paths) { + cnt_.fetch_add(1, std::memory_order_relaxed); + if (cnt_.load(std::memory_order_relaxed) > limit_) { + break; + } + Row row; + row.values.emplace_back(path.front()); + std::vector tmp(path.begin() + 1, path.end()); + List edgeList(std::move(tmp)); + row.values.emplace_back(std::move(edgeList)); + row.values.emplace_back(emptyPropVertex); + result.emplace_back(std::move(row)); + } + } + auto iter = emptyPropVertices_.find(emptyPropVertex); + if (iter == emptyPropVertices_.end()) { + emptyPropVids_.emplace_back(vid); + emptyPropVertices_.emplace(emptyPropVertex); + } + } + return result; +} + +folly::Future AllPathsExecutor::conjunctPath(std::vector& leftPaths, + std::vector& rightPaths) { + if (leftPaths.empty() || rightPaths.empty()) { + return folly::makeFuture(Status::OK()); + } + bool reverse = false; + if (leftPaths.size() < rightPaths.size()) { + buildHashTable(leftPaths, reverse); + probePaths_ = std::move(rightPaths); + } else { + reverse = true; + buildHashTable(rightPaths, reverse); + probePaths_ = std::move(leftPaths); + } + auto oneWayPath = buildOneWayPathFromHashTable(!reverse); + + size_t probeSize = probePaths_.size(); + std::vector>> futures; + if (probeSize < FLAGS_path_batch_size) { + futures.emplace_back(folly::via( + runner(), [this, probeSize, reverse]() { return probe(0, probeSize, reverse); })); + } else { + for (size_t start = 0; start < probeSize; start += FLAGS_path_batch_size) { + auto tmp = start + FLAGS_path_batch_size; + auto end = tmp > probeSize ? probeSize : tmp; + futures.emplace_back(folly::via( + runner(), [this, start, end, reverse]() { return probe(start, end, reverse); })); + } + } + return folly::collect(futures).via(runner()).thenValue( + [this, path = std::move(oneWayPath)](std::vector>&& resps) { + memory::MemoryCheckGuard guard; + result_.rows = std::move(path); + for (auto& rows : resps) { + if (rows.empty()) { + continue; + } + auto& emptyPropVerticesRow = rows.back(); + for (auto& emptyPropVertex : emptyPropVerticesRow.values) { + auto iter = emptyPropVertices_.find(emptyPropVertex); + if (iter == emptyPropVertices_.end()) { + emptyPropVids_.emplace_back(emptyPropVertex.getVertex().vid); + } + emptyPropVertices_.emplace(emptyPropVertex); + } + rows.pop_back(); + result_.rows.insert(result_.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + if (result_.rows.size() > limit_) { + result_.rows.resize(limit_); + } + return Status::OK(); + }); +} + +void AllPathsExecutor::buildHashTable(std::vector& paths, bool reverse) { + for (auto& path : paths) { + auto& edgeVal = path->edge; + const auto& edge = edgeVal.getEdge(); + auto findDst = hashTable_.find(edge.dst); + if (findDst == hashTable_.end()) { + std::vector> tmp({convertNPath2List(path, reverse)}); + hashTable_.emplace(edge.dst, std::move(tmp)); + } else { + findDst->second.emplace_back(convertNPath2List(path, reverse)); + } + } +} + +std::vector AllPathsExecutor::probe(size_t start, size_t end, bool reverse) { + auto buildPath = [](std::vector& leftPath, + const Value& intersectVertex, + std::vector& rightPath) { + Row row; + row.values.emplace_back(leftPath.front()); + std::vector edgeList(leftPath.begin() + 1, leftPath.end()); + edgeList.emplace_back(intersectVertex); + edgeList.insert(edgeList.end(), rightPath.begin(), rightPath.end() - 1); + row.values.emplace_back(List(std::move(edgeList))); + row.values.emplace_back(rightPath.back()); + return row; + }; + + size_t minLength = reverse ? rightSteps_ * 2 : leftSteps_ * 2; + std::vector result; + Row emptyPropVerticesRow; + for (size_t i = start; i < end; ++i) { + auto& probePath = probePaths_[i]; + auto& edgeVal = probePath->edge; + const auto& intersectVid = edgeVal.getEdge().dst; + Value intersectVertex(Vertex(intersectVid, {})); + auto findDst = hashTable_.find(intersectVid); + if (findDst == hashTable_.end()) { + continue; + } + auto valueList = convertNPath2List(probePath, !reverse); + + for (auto& path : findDst->second) { + if (path.size() != minLength) { + continue; + } + auto& leftPath = reverse ? valueList : path; + auto& rightPath = reverse ? path : valueList; + if (noLoop_) { + if (hasSameVertices(leftPath, intersectVid, rightPath)) { + continue; + } + } else { + if (hasSameEdge(leftPath, rightPath)) { + continue; + } + } + cnt_.fetch_add(1, std::memory_order_relaxed); + if (cnt_.load(std::memory_order_relaxed) > limit_) { + break; + } + result.emplace_back(buildPath(leftPath, intersectVertex, rightPath)); + } + emptyPropVerticesRow.values.emplace_back(intersectVertex); + } + result.emplace_back(emptyPropVerticesRow); + return result; +} + folly::Future AllPathsExecutor::getPathProps() { auto future = getProps(emptyPropVids_, pathNode_->vertexProps()); return future.via(runner()).thenValue([this](std::vector&& vertices) { @@ -470,11 +597,11 @@ folly::Future AllPathsExecutor::getPathProps() { if (vertex.empty()) { continue; } - auto iter = leftAdjList_.find(vertex); - if (iter != leftAdjList_.end()) { - auto val = iter->first; + auto range = emptyPropVertices_.equal_range(vertex); + for (auto iter = range.first; iter != range.second; ++iter) { + auto val = *iter; auto& mutableVertex = val.mutableVertex(); - mutableVertex.tags.swap(vertex.mutableVertex().tags); + mutableVertex.tags = vertex.mutableVertex().tags; } } return finish(ResultBuilder().value(Value(std::move(result_))).build()); @@ -508,5 +635,42 @@ bool AllPathsExecutor::hasSameVertices(NPath* path, const Edge& edge) { return false; } +bool AllPathsExecutor::hasSameEdge(std::vector& leftPaths, std::vector& rightPaths) { + for (auto& leftEdge : leftPaths) { + if (!leftEdge.isEdge()) { + continue; + } + for (auto& rightEdge : rightPaths) { + if (rightEdge.isEdge() && rightEdge.getEdge().keyEqual(leftEdge.getEdge())) { + return true; + } + } + } + return false; +} + +bool AllPathsExecutor::hasSameVertices(const std::vector& leftPaths, + const Value& intersectVertex, + const std::vector& rightPaths) { + bool flag = leftPaths.size() > rightPaths.size(); + const auto& hashPath = flag ? rightPaths : leftPaths; + const auto& probePath = flag ? leftPaths : rightPaths; + VidHashSet hashTable; + for (const auto& v : hashPath) { + if (v.isVertex()) { + hashTable.emplace(v); + } + } + hashTable.emplace(intersectVertex); + for (const auto& v : probePath) { + if (v.isVertex()) { + if (hashTable.find(v) != hashTable.end()) { + return true; + } + } + } + return false; +} + } // namespace graph } // namespace nebula diff --git a/src/graph/executor/algo/AllPathsExecutor.h b/src/graph/executor/algo/AllPathsExecutor.h index 695a8efaa06..0d4417764d3 100644 --- a/src/graph/executor/algo/AllPathsExecutor.h +++ b/src/graph/executor/algo/AllPathsExecutor.h @@ -67,14 +67,15 @@ class AllPathsExecutor final : public StorageAccessExecutor { folly::Future getNeighbors(bool reverse); - void expandFromLeft(GetNeighborsIter* iter); + void expand(GetNeighborsIter* iter, bool reverse); - void expandFromRight(GetNeighborsIter* iter); + std::vector convertNPath2List(NPath* path, bool reverse); - Row convertNPath2Row(NPath* path, Value dst); - - folly::Future>> doBuildPath( - size_t step, size_t start, size_t end, std::shared_ptr> paths); + folly::Future> doBuildPath(size_t step, + size_t start, + size_t end, + std::shared_ptr> paths, + bool reverse); folly::Future getPathProps(); @@ -82,10 +83,27 @@ class AllPathsExecutor final : public StorageAccessExecutor { folly::Future buildResult(); + void buildHashTable(std::vector& paths, bool reverse); + + std::vector probe(size_t start, size_t end, bool reverse); + + folly::Future conjunctPath(std::vector& leftPaths, + std::vector& rightPaths); + bool hasSameEdge(NPath* path, const Edge& edge); bool hasSameVertices(NPath* path, const Edge& edge); + bool hasSameEdge(std::vector& leftPaths, std::vector& rightPaths); + + bool hasSameVertices(const std::vector& leftPaths, + const Value& intersectVertex, + const std::vector& rightPaths); + + void buildOneWayPath(std::vector& paths, bool reverse); + + std::vector buildOneWayPathFromHashTable(bool reverse); + private: const AllPaths* pathNode_{nullptr}; bool withProp_{false}; @@ -107,8 +125,12 @@ class AllPathsExecutor final : public StorageAccessExecutor { DataSet result_; std::vector emptyPropVids_; + std::unordered_multiset emptyPropVertices_; class NewTag {}; folly::ThreadLocalPtr, NewTag> threadLocalPtr_; + + std::unordered_map>> hashTable_; + std::vector probePaths_; }; } // namespace graph } // namespace nebula diff --git a/src/graph/executor/query/ExpandAllExecutor.cpp b/src/graph/executor/query/ExpandAllExecutor.cpp index dd85469bbb4..396ddc97c7d 100644 --- a/src/graph/executor/query/ExpandAllExecutor.cpp +++ b/src/graph/executor/query/ExpandAllExecutor.cpp @@ -11,6 +11,7 @@ using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; +using nebula::storage::cpp2::GetDstBySrcResponse; using nebula::storage::cpp2::GetNeighborsResponse; namespace nebula { @@ -60,9 +61,78 @@ folly::Future ExpandAllExecutor::execute() { if (nextStepVids_.empty()) { return finish(ResultBuilder().value(Value(std::move(result_))).build()); } + if (vertexColumns_ == nullptr && edgeColumns_ == nullptr) { + return GetDstBySrc(); + } return getNeighbors(); } +folly::Future ExpandAllExecutor::GetDstBySrc() { + currentStep_++; + time::Duration getDstTime; + StorageClient* storageClient = qctx_->getStorageClient(); + StorageClient::CommonRequestParam param(expand_->space(), + qctx_->rctx()->session()->id(), + qctx_->plan()->id(), + qctx_->plan()->isProfileEnabled()); + std::vector vids(nextStepVids_.size()); + std::move(nextStepVids_.begin(), nextStepVids_.end(), vids.begin()); + return storageClient->getDstBySrc(param, std::move(vids), expand_->edgeTypes()) + .via(runner()) + .ensure([this, getDstTime]() { + SCOPED_TIMER(&execTime_); + otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getDstTime.elapsedInUSec())); + }) + .thenValue([this](StorageRpcResponse&& resps) { + memory::MemoryCheckGuard guard; + nextStepVids_.clear(); + SCOPED_TIMER(&execTime_); + auto& hostLatency = resps.hostLatency(); + for (size_t i = 0; i < hostLatency.size(); ++i) { + size_t size = 0u; + auto& result = resps.responses()[i]; + if (result.dsts_ref().has_value()) { + size = (*result.dsts_ref()).size(); + } + auto info = util::collectRespProfileData(result.result, hostLatency[i], size); + otherStats_.emplace(folly::sformat("step{} resp [{}]", currentStep_, i), + folly::toPrettyJson(info)); + } + auto result = handleCompleteness(resps, FLAGS_accept_partial_success); + if (!result.ok()) { + return folly::makeFuture(result.status()); + } + auto& responses = resps.responses(); + if (currentStep_ <= maxSteps_) { + for (auto& resp : responses) { + auto* dataset = resp.get_dsts(); + if (dataset == nullptr) continue; + for (auto& row : dataset->rows) { + nextStepVids_.insert(row.values.begin(), row.values.end()); + // add the dataset of each step to result_ + result_.rows.emplace_back(row); + } + } + if (nextStepVids_.empty()) { + finish(ResultBuilder().value(Value(std::move(result_))).build()); + return folly::makeFuture(Status::OK()); + } + return GetDstBySrc(); + } else { + for (auto& resp : responses) { + auto* dataset = resp.get_dsts(); + if (dataset == nullptr) continue; + for (auto& row : dataset->rows) { + // add the dataset of each step to result_ + result_.rows.emplace_back(row); + } + } + finish(ResultBuilder().value(Value(std::move(result_))).build()); + return folly::makeFuture(Status::OK()); + } + }); +} + folly::Future ExpandAllExecutor::getNeighbors() { currentStep_++; StorageClient* storageClient = qctx_->getStorageClient(); diff --git a/src/graph/executor/query/ExpandAllExecutor.h b/src/graph/executor/query/ExpandAllExecutor.h index 6f5388b6eac..388d434925b 100644 --- a/src/graph/executor/query/ExpandAllExecutor.h +++ b/src/graph/executor/query/ExpandAllExecutor.h @@ -59,6 +59,8 @@ class ExpandAllExecutor final : public StorageAccessExecutor { folly::Future getNeighbors(); + folly::Future GetDstBySrc(); + void getNeighborsFromCache(std::unordered_map>& dst2VidsMap, std::unordered_set& visitedVids, std::vector& samples); diff --git a/src/graph/planner/ngql/GoPlanner.cpp b/src/graph/planner/ngql/GoPlanner.cpp index 220f8f0f404..1fc04cc9973 100644 --- a/src/graph/planner/ngql/GoPlanner.cpp +++ b/src/graph/planner/ngql/GoPlanner.cpp @@ -160,18 +160,42 @@ PlanNode* GoPlanner::buildJoinDstPlan(PlanNode* dep) { SubPlan GoPlanner::doSimplePlan() { auto qctx = goCtx_->qctx; - size_t step = goCtx_->steps.mSteps(); + size_t minStep = goCtx_->steps.mSteps(); + size_t maxStep = goCtx_->steps.nSteps(); + size_t steps = minStep; + if (minStep != maxStep) { + steps = minStep == 0 ? minStep : minStep - 1; + } + auto* expand = Expand::make(qctx, startNode_, goCtx_->space.id, false, // random - step, + steps, buildEdgeProps(true)); expand->setEdgeTypes(buildEdgeTypes()); expand->setColNames({"_expand_vid"}); expand->setInputVar(goCtx_->vidsVar); - auto* dedup = Dedup::make(qctx, expand); + auto dep = expand; + if (minStep != maxStep) { + // simple m to n case + // go m to n steps from 'xxx' over edge yield distinct edge._dst + dep = ExpandAll::make(qctx, + dep, + goCtx_->space.id, + false, // random + minStep, + maxStep, + buildEdgeProps(true), + nullptr, + nullptr, + nullptr); + dep->setEdgeTypes(buildEdgeTypes()); + dep->setColNames({"_expandall_vid"}); + } + + auto* dedup = Dedup::make(qctx, dep); auto pool = qctx->objPool(); auto* newYieldExpr = pool->makeAndAdd(); diff --git a/src/graph/validator/GoValidator.cpp b/src/graph/validator/GoValidator.cpp index 7ef45b1b9e0..cf65da70e15 100644 --- a/src/graph/validator/GoValidator.cpp +++ b/src/graph/validator/GoValidator.cpp @@ -292,7 +292,7 @@ bool GoValidator::checkDstPropOrVertexExist(const Expression* expr) { } bool GoValidator::isSimpleCase() { - if (!goCtx_->limits.empty() || !goCtx_->distinct || goCtx_->filter || goCtx_->steps.isMToN() || + if (!goCtx_->limits.empty() || !goCtx_->distinct || goCtx_->filter || goCtx_->from.fromType != FromType::kInstantExpr) { return false; } diff --git a/src/graph/validator/test/QueryValidatorTest.cpp b/src/graph/validator/test/QueryValidatorTest.cpp index d999ef3f95c..ad5ef739f97 100644 --- a/src/graph/validator/test/QueryValidatorTest.cpp +++ b/src/graph/validator/test/QueryValidatorTest.cpp @@ -673,13 +673,13 @@ TEST_F(QueryValidatorTest, GoMToN) { { std::string query = "GO 1 TO 2 STEPS FROM '1' OVER like YIELD DISTINCT like._dst"; std::vector expected = { - PK::kDedup, PK::kProject, PK::kExpandAll, PK::kExpand, PK::kStart}; + PK::kProject, PK::kDedup, PK::kExpandAll, PK::kExpand, PK::kStart}; EXPECT_TRUE(checkResult(query, expected)); } { std::string query = "GO 0 TO 2 STEPS FROM '1' OVER like YIELD DISTINCT like._dst"; std::vector expected = { - PK::kDedup, PK::kProject, PK::kExpandAll, PK::kExpand, PK::kStart}; + PK::kProject, PK::kDedup, PK::kExpandAll, PK::kExpand, PK::kStart}; EXPECT_TRUE(checkResult(query, expected)); } { @@ -700,13 +700,13 @@ TEST_F(QueryValidatorTest, GoMToN) { { std::string query = "GO 1 TO 2 STEPS FROM '1' OVER like REVERSELY YIELD DISTINCT like._dst"; std::vector expected = { - PK::kDedup, PK::kProject, PK::kExpandAll, PK::kExpand, PK::kStart}; + PK::kProject, PK::kDedup, PK::kExpandAll, PK::kExpand, PK::kStart}; EXPECT_TRUE(checkResult(query, expected)); } { std::string query = "GO 1 TO 2 STEPS FROM '1' OVER like BIDIRECT YIELD DISTINCT like._dst"; std::vector expected = { - PK::kDedup, PK::kProject, PK::kExpandAll, PK::kExpand, PK::kStart}; + PK::kProject, PK::kDedup, PK::kExpandAll, PK::kExpand, PK::kStart}; EXPECT_TRUE(checkResult(query, expected)); } { diff --git a/tests/tck/features/go/SimpleCase.feature b/tests/tck/features/go/SimpleCase.feature index 625670d2899..addcb1107ba 100644 --- a/tests/tck/features/go/SimpleCase.feature +++ b/tests/tck/features/go/SimpleCase.feature @@ -266,11 +266,11 @@ Feature: Simple case And the execution plan should be: | id | name | dependencies | operator info | | 6 | Aggregate | 5 | | - | 5 | Dedup | 4 | | - | 4 | Project | 3 | | + | 5 | Project | 4 | | + | 4 | Dedup | 3 | | | 3 | ExpandAll | 2 | | - | 2 | Expand | 1 | | - | 1 | Start | | | + | 2 | Expand | 0 | | + | 0 | Start | | | When profiling query: """ GO 1 to 3 STEP FROM "Tony Parker" OVER like WHERE $$.player.age > 40 YIELD DISTINCT id($$), $$.player.age as age, $$.player.name | ORDER BY $-.age @@ -374,11 +374,11 @@ Feature: Simple case | 12 | Dedup | 11 | | | 11 | Project | 10 | | | 10 | HashInnerJoin | 5,9 | | - | 5 | Dedup | 4 | | - | 4 | Project | 3 | | + | 5 | Project | 4 | | + | 4 | Dedup | 3 | | | 3 | ExpandAll | 2 | | - | 2 | Expand | 1 | | - | 1 | Start | | | + | 2 | Expand | 0 | | + | 0 | Start | | | | 9 | ExpandAll | 8 | | | 8 | Expand | 7 | | | 7 | Argument | | |