diff --git a/src/graph/executor/query/ProjectExecutor.cpp b/src/graph/executor/query/ProjectExecutor.cpp index 64db18cede0..dd1104245b9 100644 --- a/src/graph/executor/query/ProjectExecutor.cpp +++ b/src/graph/executor/query/ProjectExecutor.cpp @@ -15,7 +15,6 @@ folly::Future ProjectExecutor::execute() { auto *project = asNode(node()); auto iter = ectx_->getResult(project->inputVar()).iter(); DCHECK(!!iter); - QueryExpressionContext ctx(ectx_); if (FLAGS_max_job_size <= 1) { auto ds = handleJob(0, iter->size(), iter.get()); diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index e4c7a9eadd3..f629c7f7582 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -22,54 +22,61 @@ folly::Future TraverseExecutor::execute() { if (!status.ok()) { return error(std::move(status)); } - return traverse(); -} - -Status TraverseExecutor::close() { - // clear the members - vids_.clear(); - return Executor::close(); + if (vids_.empty()) { + DataSet emptyDs; + return finish(ResultBuilder().value(Value(std::move(emptyDs))).build()); + } + return getNeighbors(); } Status TraverseExecutor::buildRequestVids() { - time::Duration dur; SCOPED_TIMER(&execTime_); const auto& inputVar = traverse_->inputVar(); auto inputIter = ectx_->getResult(inputVar).iterRef(); auto iter = static_cast(inputIter); - - const auto& spaceInfo = qctx()->rctx()->session()->space(); - const auto& metaVidType = *(spaceInfo.spaceDesc.vid_type_ref()); - auto vidType = SchemaUtil::propTypeToValueType(metaVidType.get_type()); + size_t iterSize = iter->size(); + vids_.reserve(iterSize); + initVids_.reserve(iterSize); auto* src = traverse_->src(); - QueryExpressionContext ctx(ectx_); - HashMap prev; bool mv = movable(traverse_->inputVars().front()); - for (; iter->valid(); iter->next()) { - const auto& vid = src->eval(ctx(iter)); - if (vid.type() != vidType) { - LOG(ERROR) << "Mismatched vid type: " << vid.type() << ", space vid type: " << vidType; - continue; + if (traverse_->trackPrevPath()) { + std::unordered_set uniqueVid; + uniqueVid.reserve(iterSize); + for (; iter->valid(); iter->next()) { + const auto& vid = src->eval(ctx(iter)); + auto prevPath = mv ? iter->moveRow() : *iter->row(); + auto vidIter = dst2PathsMap_.find(vid); + if (vidIter == dst2PathsMap_.end()) { + std::vector tmp({std::move(prevPath)}); + dst2PathsMap_.emplace(vid, std::move(tmp)); + } else { + vidIter->second.emplace_back(std::move(prevPath)); + } + if (uniqueVid.emplace(vid).second) { + vids_.emplace_back(vid); + initVids_.emplace_back(vid); + } + } + } else { + initVids_.reserve(iterSize); + const auto& spaceInfo = qctx()->rctx()->session()->space(); + const auto& metaVidType = *(spaceInfo.spaceDesc.vid_type_ref()); + auto vidType = SchemaUtil::propTypeToValueType(metaVidType.get_type()); + for (; iter->valid(); iter->next()) { + const auto& vid = src->eval(ctx(iter)); + if (vid.type() != vidType) { + LOG(ERROR) << "Mismatched vid type: " << vid.type() << ", space vid type: " << vidType; + continue; + } + vids_.emplace_back(vid); + initVids_.emplace_back(vid); } - // Need copy here, Argument executor may depends on this variable. - auto prePath = mv ? iter->moveRow() : *iter->row(); - buildPath(prev, vid, std::move(prePath)); - vids_.emplace(vid); } - paths_.emplace_back(std::move(prev)); return Status::OK(); } -folly::Future TraverseExecutor::traverse() { - if (vids_.empty()) { - DataSet emptyResult; - return finish(ResultBuilder().value(Value(std::move(emptyResult))).build()); - } - return getNeighbors(); -} - folly::Future TraverseExecutor::getNeighbors() { currentStep_++; time::Duration getNbrTime; @@ -79,12 +86,10 @@ folly::Future TraverseExecutor::getNeighbors() { qctx()->rctx()->session()->id(), qctx()->plan()->id(), qctx()->plan()->isProfileEnabled()); - std::vector vids(vids_.begin(), vids_.end()); - vids_.clear(); return storageClient ->getNeighbors(param, {nebula::kVid}, - std::move(vids), + std::move(vids_), traverse_->edgeTypes(), traverse_->edgeDirection(), finalStep ? traverse_->statProps() : nullptr, @@ -99,6 +104,7 @@ folly::Future TraverseExecutor::getNeighbors() { nullptr) .via(runner()) .thenValue([this, getNbrTime](StorageRpcResponse&& resp) mutable { + vids_.clear(); SCOPED_TIMER(&execTime_); addStats(resp, getNbrTime.elapsedInUSec()); return handleResponse(std::move(resp)); @@ -107,7 +113,7 @@ folly::Future TraverseExecutor::getNeighbors() { Expression* TraverseExecutor::selectFilter() { Expression* filter = nullptr; - if (!(currentStep_ == 1 && zeroStep())) { + if (!(currentStep_ == 1 && traverse_->zeroStep())) { filter = const_cast(traverse_->filter()); } if (currentStep_ == 1) { @@ -140,10 +146,8 @@ folly::Future TraverseExecutor::handleResponse(RpcResponse&& resps) { if (!result.ok()) { return folly::makeFuture(std::move(result).status()); } - - auto& responses = resps.responses(); List list; - for (auto& resp : responses) { + for (auto& resp : resps.responses()) { auto dataset = resp.get_vertices(); if (dataset == nullptr) { continue; @@ -151,70 +155,37 @@ folly::Future TraverseExecutor::handleResponse(RpcResponse&& resps) { list.values.emplace_back(std::move(*dataset)); } auto listVal = std::make_shared(std::move(list)); - - if (FLAGS_max_job_size <= 1) { - auto iter = std::make_unique(listVal); - auto status = buildInterimPath(iter.get()); - if (!status.ok()) { - return folly::makeFuture(std::move(status)); - } - if (!isFinalStep()) { - if (vids_.empty()) { - if (range_ != nullptr) { - return folly::makeFuture(buildResult()); - } else { - return folly::makeFuture(Status::OK()); - } + auto iter = std::make_unique(listVal); + if (currentStep_ == 1 && range_ && range_->min() == 0) { + result_.rows = buildZeroStepPath(iter.get()); + } + expand(iter.get()); + if (!isFinalStep()) { + if (vids_.empty()) { + if (range_ != nullptr) { + return buildResult(); } else { - return getNeighbors(); + return folly::makeFuture(Status::OK()); } } else { - return folly::makeFuture(buildResult()); + return getNeighbors(); } } else { - return std::move(buildInterimPathMultiJobs(std::make_unique(listVal))) - .via(runner()) - .thenValue([this](auto&& status) { - if (!status.ok()) { - return folly::makeFuture(std::move(status)); - } - if (!isFinalStep()) { - if (vids_.empty()) { - if (range_ != nullptr) { - return folly::makeFuture(buildResult()); - } else { - return folly::makeFuture(Status::OK()); - } - } else { - return getNeighbors(); - } - } else { - return folly::makeFuture(buildResult()); - } - }); + return buildResult(); } } -Status TraverseExecutor::buildInterimPath(GetNeighborsIter* iter) { - size_t count = 0; - - const auto& prev = paths_.back(); - if (currentStep_ == 1 && zeroStep()) { - paths_.emplace_back(); - NG_RETURN_IF_ERROR(handleZeroStep(prev, iter->getVertices(), paths_.back(), count)); - // If 0..0 case, release memory and return immediately. - if (range_ != nullptr && range_->max() == 0) { - releasePrevPaths(count); - return Status::OK(); - } +void TraverseExecutor::expand(GetNeighborsIter* iter) { + if (iter->numRows() == 0) { + return; } - paths_.emplace_back(); - auto& current = paths_.back(); - auto* vFilter = traverse_->vFilter(); auto* eFilter = traverse_->eFilter(); QueryExpressionContext ctx(ectx_); + std::unordered_set uniqueVids; + Value curVertex; + std::vector adjEdges; for (; iter->valid(); iter->next()) { if (vFilter != nullptr && currentStep_ == 1) { const auto& vFilterVal = vFilter->eval(ctx(iter)); @@ -228,258 +199,318 @@ Status TraverseExecutor::buildInterimPath(GetNeighborsIter* iter) { continue; } } - auto& dst = iter->getEdgeProp("*", kDst); - if (dst.type() == Value::Type::__EMPTY__) { - // no edge return Empty + const auto& edge = iter->getEdge(); + if (edge.empty()) { continue; } - auto srcV = iter->getVertex(); - auto e = iter->getEdge(); - // Join on dst = src - auto pathToSrcFound = prev.find(srcV.getVertex().vid); - if (pathToSrcFound == prev.end()) { - return Status::Error("Can't find prev paths."); + const auto& dst = edge.getEdge().dst; + if (adjList_.find(dst) == adjList_.end() && uniqueVids.emplace(dst).second) { + vids_.emplace_back(dst); + } + const auto& vertex = iter->getVertex(); + curVertex = curVertex.empty() ? vertex : curVertex; + if (curVertex != vertex) { + adjList_.emplace(curVertex, std::move(adjEdges)); + curVertex = vertex; } - const auto& paths = pathToSrcFound->second; - for (auto& prevPath : paths) { - if (hasSameEdge(prevPath, e.getEdge())) { + adjEdges.emplace_back(edge); + } + if (!curVertex.empty()) { + adjList_.emplace(curVertex, std::move(adjEdges)); + } +} + +std::vector TraverseExecutor::buildZeroStepPath(GetNeighborsIter* iter) { + if (!iter || iter->numRows() == 0) { + return std::vector(); + } + std::vector result; + result.reserve(iter->size()); + auto vertices = iter->getVertices(); + if (traverse_->trackPrevPath()) { + for (auto& vertex : vertices.values) { + auto dstIter = dst2PathsMap_.find(vertex); + if (dstIter == dst2PathsMap_.end()) { continue; } - vids_.emplace(dst); - - if (currentStep_ == 1) { - Row path; - if (traverse_->trackPrevPath()) { - path = prevPath; - } - path.values.emplace_back(srcV); - List neighbors; - neighbors.values.emplace_back(e); - path.values.emplace_back(std::move(neighbors)); - buildPath(current, dst, std::move(path)); - ++count; - } else { - auto path = prevPath; - auto& eList = path.values.back().mutableList().values; - eList.emplace_back(srcV); - eList.emplace_back(e); - buildPath(current, dst, std::move(path)); - ++count; + auto& prevPaths = dstIter->second; + for (auto& p : prevPaths) { + Row row = p; + List edgeList; + edgeList.values.emplace_back(vertex); + row.values.emplace_back(vertex); + row.values.emplace_back(std::move(edgeList)); + result.emplace_back(std::move(row)); } - } // `prevPath' - } // `iter' - - releasePrevPaths(count); - return Status::OK(); -} - -folly::Future TraverseExecutor::buildInterimPathMultiJobs( - std::unique_ptr iter) { - size_t pathCnt = 0; - const auto* prev = &paths_.back(); - if (currentStep_ == 1 && zeroStep()) { - paths_.emplace_back(); - NG_RETURN_IF_ERROR(handleZeroStep(*prev, iter->getVertices(), paths_.back(), pathCnt)); - // If 0..0 case, release memory and return immediately. - if (range_ != nullptr && range_->max() == 0) { - releasePrevPaths(pathCnt); - return Status::OK(); + } + } else { + for (auto& vertex : vertices.values) { + Row row; + List edgeList; + edgeList.values.emplace_back(vertex); + row.values.emplace_back(vertex); + row.values.emplace_back(std::move(edgeList)); + result.emplace_back(std::move(row)); } } - paths_.emplace_back(); + return result; +} - auto scatter = [this, prev]( - size_t begin, size_t end, Iterator* tmpIter) mutable -> StatusOr { - return handleJob(begin, end, tmpIter, *prev); - }; +folly::Future TraverseExecutor::buildResult() { + size_t minStep = 1; + size_t maxStep = 1; + if (range_ != nullptr) { + minStep = range_->min(); + maxStep = range_->max(); + } - auto gather = [this, pathCnt](std::vector> results) mutable -> Status { - auto& current = paths_.back(); - size_t mapCnt = 0; - for (auto& r : results) { - if (!r.ok()) { - return r.status(); - } else { - mapCnt += r.value().newPaths.size(); - } - } - current.reserve(mapCnt); - for (auto& r : results) { - auto jobResult = std::move(r).value(); - pathCnt += jobResult.pathCnt; - auto& vids = jobResult.vids; - if (!vids.empty()) { - vids_.insert(std::make_move_iterator(vids.begin()), std::make_move_iterator(vids.end())); - } - for (auto& kv : jobResult.newPaths) { - auto& paths = current[kv.first]; - paths.insert(paths.end(), - std::make_move_iterator(kv.second.begin()), - std::make_move_iterator(kv.second.end())); + result_.colNames = traverse_->colNames(); + if (maxStep == 0) { + return finish(ResultBuilder().value(Value(std::move(result_))).build()); + } + if (FLAGS_max_job_size <= 1) { + for (const auto& vid : initVids_) { + auto paths = buildPath(vid, minStep, maxStep); + if (paths.empty()) { + continue; } + result_.rows.insert(result_.rows.end(), + std::make_move_iterator(paths.begin()), + std::make_move_iterator(paths.end())); } - releasePrevPaths(pathCnt); - return Status::OK(); - }; - - return runMultiJobs(std::move(scatter), std::move(gather), iter.get()); + return finish(ResultBuilder().value(Value(std::move(result_))).build()); + } + return buildPathMultiJobs(minStep, maxStep); } -StatusOr TraverseExecutor::handleJob(size_t begin, - size_t end, - Iterator* iter, - const HashMap& prev) { - // Handle edges from begin to end, [begin, end) - JobResult jobResult; - size_t& pathCnt = jobResult.pathCnt; - auto& vids = jobResult.vids; - QueryExpressionContext ctx(ectx_); - auto* vFilter = traverse_->vFilter() ? traverse_->vFilter()->clone() : nullptr; - auto* eFilter = traverse_->eFilter() ? traverse_->eFilter()->clone() : nullptr; - auto& current = jobResult.newPaths; - for (; iter->valid() && begin++ < end; iter->next()) { - if (vFilter != nullptr && currentStep_ == 1) { - const auto& vFilterVal = vFilter->eval(ctx(iter)); - if (!vFilterVal.isBool() || !vFilterVal.getBool()) { +folly::Future TraverseExecutor::buildPathMultiJobs(size_t minStep, size_t maxStep) { + DataSet vids; + vids.rows.reserve(initVids_.size()); + for (auto& vid : initVids_) { + Row row; + row.values.emplace_back(std::move(vid)); + vids.rows.emplace_back(std::move(row)); + } + auto val = std::make_shared(std::move(vids)); + auto iter = std::make_unique(val); + + auto scatter = [this, minStep, maxStep]( + size_t begin, size_t end, Iterator* tmpIter) mutable -> std::vector { + std::vector rows; + for (; tmpIter->valid() && begin++ < end; tmpIter->next()) { + auto& vid = tmpIter->getColumn(0); + auto paths = buildPath(vid, minStep, maxStep); + if (paths.empty()) { continue; } + rows.insert( + rows.end(), std::make_move_iterator(paths.begin()), std::make_move_iterator(paths.end())); } - if (eFilter != nullptr) { - const auto& eFilterVal = eFilter->eval(ctx(iter)); - if (!eFilterVal.isBool() || !eFilterVal.getBool()) { + return rows; + }; + + auto gather = [this](std::vector> resp) mutable -> Status { + for (auto& rows : resp) { + if (rows.empty()) { continue; } + result_.rows.insert(result_.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); } - auto& dst = iter->getEdgeProp("*", kDst); - auto srcV = iter->getVertex(); - auto e = iter->getEdge(); - // Join on dst = src - auto pathToSrcFound = prev.find(srcV.getVertex().vid); - if (pathToSrcFound == prev.end()) { - return Status::Error("Can't find prev paths."); - } - const auto& paths = pathToSrcFound->second; - for (auto& prevPath : paths) { - if (hasSameEdge(prevPath, e.getEdge())) { - continue; - } - vids.emplace(dst); - if (currentStep_ == 1) { - Row path; - if (traverse_->trackPrevPath()) { - path = prevPath; - } - path.values.emplace_back(srcV); - List neighbors; - neighbors.values.emplace_back(e); - path.values.emplace_back(std::move(neighbors)); - buildPath(current, dst, std::move(path)); - ++pathCnt; - } else { - auto path = prevPath; - auto& eList = path.values.back().mutableList().values; - eList.emplace_back(srcV); - eList.emplace_back(e); - buildPath(current, dst, std::move(path)); - ++pathCnt; - } - } // `prevPath' - } // `iter' - return jobResult; -} + finish(ResultBuilder().value(Value(std::move(result_))).build()); + return Status::OK(); + }; -void TraverseExecutor::buildPath(HashMap& currentPaths, const Value& dst, Row&& path) { - auto pathToDstFound = currentPaths.find(dst); - if (pathToDstFound == currentPaths.end()) { - Paths interimPaths; - interimPaths.emplace_back(std::move(path)); - currentPaths.emplace(dst, std::move(interimPaths)); - } else { - auto& interimPaths = pathToDstFound->second; - interimPaths.emplace_back(std::move(path)); - } + return runMultiJobs(std::move(scatter), std::move(gather), iter.get()); } -Status TraverseExecutor::buildResult() { - // This means we are reaching a dead end, return empty. - if (range_ != nullptr && currentStep_ < range_->min()) { - return finish(ResultBuilder().value(Value(DataSet())).build()); +// build path based on BFS through adjancency list +std::vector TraverseExecutor::buildPath(const Value& vid, size_t minStep, size_t maxStep) { + auto vidIter = adjList_.find(vid); + if (vidIter == adjList_.end()) { + return std::vector(); } - - DataSet result; - result.colNames = traverse_->colNames(); - result.rows.reserve(totalPathCnt_); - for (auto& currentStepPaths : paths_) { - for (auto& paths : currentStepPaths) { - std::move(paths.second.begin(), paths.second.end(), std::back_inserter(result.rows)); - } + auto& src = vidIter->first; + auto& adjEdges = vidIter->second; + if (adjEdges.empty()) { + return std::vector(); } - return finish(ResultBuilder().value(Value(std::move(result))).build()); -} + std::vector result; + result.reserve(adjEdges.size()); + for (auto& edge : adjEdges) { + List edgeList; + edgeList.values.emplace_back(edge); + Row row; + row.values.emplace_back(src); + row.values.emplace_back(std::move(edgeList)); + result.emplace_back(std::move(row)); + } -bool TraverseExecutor::hasSameEdge(const Row& prevPath, const Edge& currentEdge) { - for (const auto& v : prevPath.values) { - if (v.isList()) { - for (const auto& e : v.getList().values) { - if (e.isEdge() && e.getEdge().keyEqual(currentEdge)) { - return true; + if (maxStep == 1) { + if (traverse_->trackPrevPath()) { + std::vector newResult; + auto dstIter = dst2PathsMap_.find(vid); + if (dstIter == dst2PathsMap_.end()) { + return std::vector(); + } + auto& prevPaths = dstIter->second; + for (auto& prevPath : prevPaths) { + for (auto& p : result) { + if (filterSameEdge(prevPath, p)) { + continue; + } + // copy + Row row = prevPath; + row.values.emplace_back(p.values.front()); + row.values.emplace_back(p.values.back()); + newResult.emplace_back(std::move(row)); } } + return newResult; + } else { + return result; } } - return false; -} -void TraverseExecutor::releasePrevPaths(size_t cnt) { - time::Duration dur; - if (range_ != nullptr) { - if (currentStep_ == range_->min() && paths_.size() > 1) { - auto rangeEnd = paths_.begin(); - std::advance(rangeEnd, paths_.size() - 1); - paths_.erase(paths_.begin(), rangeEnd); - } else if (range_->min() == 0 && currentStep_ == 1 && paths_.size() > 1) { - paths_.pop_front(); + size_t step = 2; + std::vector newResult; + std::queue*> queue; + std::list>> holder; + for (auto& edge : adjEdges) { + auto ptr = std::make_unique>(std::vector({edge})); + queue.emplace(ptr.get()); + holder.emplace_back(std::move(ptr)); + } + size_t adjSize = queue.size(); + while (!queue.empty()) { + auto edgeListPtr = queue.front(); + auto& dst = edgeListPtr->back().getEdge().dst; + queue.pop(); + --adjSize; + auto dstIter = adjList_.find(dst); + if (dstIter == adjList_.end()) { + if (adjSize == 0) { + if (++step > maxStep) { + break; + } + adjSize = queue.size(); + } + continue; } - if (currentStep_ >= range_->min()) { - totalPathCnt_ += cnt; + auto& adjedges = dstIter->second; + for (auto& edge : adjedges) { + if (hasSameEdge(*edgeListPtr, edge.getEdge())) { + continue; + } + auto newEdgeListPtr = std::make_unique>(*edgeListPtr); + newEdgeListPtr->emplace_back(dstIter->first); + newEdgeListPtr->emplace_back(edge); + + if (step >= minStep) { + Row row; + row.values.emplace_back(src); + row.values.emplace_back(List(*newEdgeListPtr)); + newResult.emplace_back(std::move(row)); + } + queue.emplace(newEdgeListPtr.get()); + holder.emplace_back(std::move(newEdgeListPtr)); + } + if (adjSize == 0) { + if (++step > maxStep) { + break; + } + adjSize = queue.size(); + } + } + if (minStep <= 1) { + newResult.insert(newResult.begin(), + std::make_move_iterator(result.begin()), + std::make_move_iterator(result.end())); + } + if (traverse_->trackPrevPath()) { + std::vector newPaths; + auto dstIter = dst2PathsMap_.find(vid); + if (dstIter != dst2PathsMap_.end()) { + auto& prevPaths = dstIter->second; + for (auto& prevPath : prevPaths) { + for (auto& p : newResult) { + if (filterSameEdge(prevPath, p)) { + continue; + } + // copy + Row row = prevPath; + row.values.emplace_back(p.values.front()); + row.values.emplace_back(p.values.back()); + newPaths.emplace_back(std::move(row)); + } + } + return newPaths; + } else { + return std::vector(); } } else { - paths_.pop_front(); - totalPathCnt_ = cnt; + return newResult; } } -Status TraverseExecutor::handleZeroStep(const HashMap& prev, - List&& vertices, - HashMap& zeroSteps, - size_t& count) { - HashSet uniqueSrc; - for (auto& srcV : vertices.values) { - auto src = srcV.getVertex().vid; - if (!uniqueSrc.emplace(src).second) { +bool TraverseExecutor::hasSameEdge(const std::vector& edgeList, const Edge& edge) { + for (auto& leftEdge : edgeList) { + if (!leftEdge.isEdge()) { continue; } - auto pathToSrcFound = prev.find(src); - if (pathToSrcFound == prev.end()) { - return Status::Error("Can't find prev paths."); + if (edge.keyEqual(leftEdge.getEdge())) { + return true; } - const auto& paths = pathToSrcFound->second; - for (auto& p : paths) { - Row path; - if (traverse_->trackPrevPath()) { - path = p; + } + return false; +} + +bool TraverseExecutor::filterSameEdge(const Row& lhs, + const Row& rhs, + std::unordered_set* uniqueEdge) { + if (uniqueEdge) { + for (auto& rightListVal : rhs.values) { + if (!rightListVal.isList()) { + continue; + } + auto& rightList = rightListVal.getList().values; + for (auto& edgeVal : rightList) { + if (!edgeVal.isEdge()) { + continue; + } + if (uniqueEdge->find(edgeVal) != uniqueEdge->end()) { + return true; + } + } + } + return false; + } else { + for (auto& leftListVal : lhs.values) { + if (!leftListVal.isList()) { + continue; + } + auto& leftList = leftListVal.getList().values; + for (auto& rightListVal : rhs.values) { + if (!rightListVal.isList()) { + continue; + } + auto& rightList = rightListVal.getList().values; + for (auto& edgeVal : rightList) { + if (!edgeVal.isEdge()) { + continue; + } + if (hasSameEdge(leftList, edgeVal.getEdge())) { + return true; + } + } } - path.values.emplace_back(srcV); - List neighbors; - neighbors.values.emplace_back(srcV); - path.values.emplace_back(std::move(neighbors)); - buildPath(zeroSteps, src, std::move(path)); - ++count; } + return false; } - return Status::OK(); } + } // namespace graph } // namespace nebula diff --git a/src/graph/executor/query/TraverseExecutor.h b/src/graph/executor/query/TraverseExecutor.h index 424945eec5c..c5a62a25fbb 100644 --- a/src/graph/executor/query/TraverseExecutor.h +++ b/src/graph/executor/query/TraverseExecutor.h @@ -39,19 +39,6 @@ namespace nebula { namespace graph { using RpcResponse = storage::StorageRpcResponse; -using Dst = Value; -using Paths = std::vector; -using HashSet = robin_hood::unordered_flat_set>; -using HashMap = robin_hood::unordered_flat_map>; - -struct JobResult { - // Newly traversed paths size - size_t pathCnt{0}; - // Request vids for next traverse - HashSet vids; - // Newly traversed paths - HashMap newPaths; -}; class TraverseExecutor final : public StorageAccessExecutor { public: @@ -62,54 +49,92 @@ class TraverseExecutor final : public StorageAccessExecutor { folly::Future execute() override; - Status close() override; - private: Status buildRequestVids(); - folly::Future traverse(); - void addStats(RpcResponse& resps, int64_t getNbrTimeInUSec); folly::Future getNeighbors(); - folly::Future handleResponse(RpcResponse&& resps); + void expand(GetNeighborsIter* iter); - Status buildInterimPath(GetNeighborsIter* iter); + folly::Future handleResponse(RpcResponse&& resps); - folly::Future buildInterimPathMultiJobs(std::unique_ptr iter); + folly::Future buildResult(); - StatusOr handleJob(size_t begin, size_t end, Iterator* iter, const HashMap& prev); + std::vector buildPath(const Value& vid, size_t minStep, size_t maxStep); - Status buildResult(); + folly::Future buildPathMultiJobs(size_t minStep, size_t maxStep); bool isFinalStep() const { return (range_ == nullptr && currentStep_ == 1) || (range_ != nullptr && (currentStep_ == range_->max() || range_->max() == 0)); } - bool zeroStep() const { - return node_->asNode()->zeroStep(); - } - - bool hasSameEdge(const Row& prevPath, const Edge& currentEdge); - - void releasePrevPaths(size_t cnt); + bool filterSameEdge(const Row& lhs, + const Row& rhs, + std::unordered_set* uniqueEdge = nullptr); - void buildPath(HashMap& currentPaths, const Value& dst, Row&& path); + bool hasSameEdge(const std::vector& edgeList, const Edge& edge); - Status handleZeroStep(const HashMap& prev, List&& vertices, HashMap& zeroSteps, size_t& count); + std::vector buildZeroStepPath(GetNeighborsIter* iter); Expression* selectFilter(); + struct VertexHash { + std::size_t operator()(const Value& v) const { + switch (v.type()) { + case Value::Type::VERTEX: { + auto& vid = v.getVertex().vid; + if (vid.type() == Value::Type::STRING) { + return std::hash()(vid.getStr()); + } else { + return vid.getInt(); + } + } + case Value::Type::STRING: { + return std::hash()(v.getStr()); + } + case Value::Type::INT: { + return v.getInt(); + } + default: { + return v.hash(); + } + } + } + }; + + struct VertexEqual { + bool operator()(const Value& lhs, const Value& rhs) const { + if (lhs.type() == rhs.type()) { + if (lhs.isVertex()) { + return lhs.getVertex().vid == rhs.getVertex().vid; + } + return lhs == rhs; + } + if (lhs.type() == Value::Type::VERTEX) { + return lhs.getVertex().vid == rhs; + } + if (rhs.type() == Value::Type::VERTEX) { + return lhs == rhs.getVertex().vid; + } + return lhs == rhs; + } + }; + private: ObjectPool objPool_; - HashSet vids_; + + std::vector vids_; + std::vector initVids_; + DataSet result_; + // Key : vertex Value : adjacent edges + std::unordered_map, VertexHash, VertexEqual> adjList_; + std::unordered_map, VertexHash, VertexEqual> dst2PathsMap_; const Traverse* traverse_{nullptr}; MatchStepRange* range_{nullptr}; size_t currentStep_{0}; - std::list paths_; - size_t totalPathCnt_{0}; }; } // namespace graph diff --git a/tests/tck/features/optimizer/PrunePropertiesRule.feature b/tests/tck/features/optimizer/PrunePropertiesRule.feature index 85b906f7bb3..c278ee8492d 100644 --- a/tests/tck/features/optimizer/PrunePropertiesRule.feature +++ b/tests/tck/features/optimizer/PrunePropertiesRule.feature @@ -747,74 +747,74 @@ Feature: Prune Properties rule When executing query: """ match (src_v:player{name:"Manu Ginobili"})-[e*2]-(dst_v) - return properties(src_v).sex,properties(e[0]).degree,properties(dst_v).name,src_v.player.age AS age, e[1].start_year,dst_v.player.age - order by age limit 5; + return properties(src_v).sex,properties(e[0]).degree as degree,properties(dst_v).name as name,src_v.player.age AS age, e[1].start_year,dst_v.player.age + order by degree, name, age limit 5; """ Then the result should be, in order, with relax comparison: - | properties(src_v).sex | properties(e[0]).degree | properties(dst_v).name | age | e[1].start_year | dst_v.player.age | - | "男" | 88 | "Danny Green" | 41 | 2010 | 31 | - | "男" | __NULL__ | "Danny Green" | 41 | 2022 | 31 | - | "男" | __NULL__ | "LeBron James" | 41 | 2022 | 34 | - | "男" | 88 | "Cory Joseph" | 41 | 2011 | 27 | - | "男" | __NULL__ | "76ers" | 41 | 2017 | NULL | + | properties(src_v).sex | degree | name | age | e[1].start_year | dst_v.player.age | + | "男" | 88 | "Aron Baynes" | 41 | 2013 | 32 | + | "男" | 88 | "Boris Diaw" | 41 | 2012 | 36 | + | "男" | 88 | "Cory Joseph" | 41 | 2011 | 27 | + | "男" | 88 | "Danny Green" | 41 | 2010 | 31 | + | "男" | 88 | "David West" | 41 | 2015 | 38 | When executing query: """ match (src_v:player{name:"Manu Ginobili"})-[e:like*2..3]-(dst_v) - return properties(src_v).sex,properties(e[0]).degree,properties(dst_v).name,src_v.player.age AS age, e[1].start_year,dst_v.player.age - order by age limit 5; + return properties(src_v).sex,properties(e[0]).degree as degree,properties(dst_v).name as name,src_v.player.age AS age, e[1].start_year,dst_v.player.age + order by degree, name, age limit 5; """ Then the result should be, in order, with relax comparison: - | properties(src_v).sex | properties(e[0]).degree | properties(dst_v).name | age | e[1].start_year | dst_v.player.age | - | "男" | __NULL__ | "Danny Green" | 41 | 2022 | 31 | - | "男" | __NULL__ | "Danny Green" | 41 | 2022 | 31 | - | "男" | __NULL__ | "Kyle Anderson" | 41 | 2022 | 25 | - | "男" | __NULL__ | "LeBron James" | 41 | 2022 | 34 | - | "男" | __NULL__ | "Kevin Durant" | 41 | 2022 | 30 | + | properties(src_v).sex | degree | name | age | e[1].start_year | dst_v.player.age | + | "男" | UNKNOWN_PROP | "Aron Baynes" | 41 | 2022 | 32 | + | "男" | UNKNOWN_PROP | "Aron Baynes" | 41 | 2022 | 32 | + | "男" | UNKNOWN_PROP | "Aron Baynes" | 41 | 2022 | 32 | + | "男" | UNKNOWN_PROP | "Aron Baynes" | 41 | 2022 | 32 | + | "男" | UNKNOWN_PROP | "Aron Baynes" | 41 | 2022 | 32 | When executing query: """ match (v1)-->(v2)-->(v3) where id(v1)=="Manu Ginobili" - return properties(v1).name,properties(v2).age AS age,properties(v3).name - order by age limit 1; + return properties(v1).name,properties(v2).age AS age,properties(v3).name as name + order by age, name limit 1; """ Then the result should be, in order, with relax comparison: - | properties(v1).name | age | properties(v3).name | - | "Manu Ginobili" | 36 | "Spurs" | + | properties(v1).name | age | name | + | "Manu Ginobili" | 36 | "Hornets" | When executing query: """ match (v1)-->(v2)-->(v3) where id(v1)=="Manu Ginobili" - return properties(v1).name,properties(v2).age AS age,properties(v3).name,v1.player.sex,v2.player.sex,id(v3) - order by age limit 1; + return properties(v1).name,properties(v2).age AS age,properties(v3).name as name,v1.player.sex,v2.player.sex,id(v3) + order by age, name limit 1; """ Then the result should be, in order, with relax comparison: - | properties(v1).name | age | properties(v3).name | v1.player.sex | v2.player.sex | id(v3) | - | "Manu Ginobili" | 36 | "Spurs" | "男" | "男" | "Spurs" | + | properties(v1).name | age | name | v1.player.sex | v2.player.sex | id(v3) | + | "Manu Ginobili" | 36 | "Hornets" | "男" | "男" | "Hornets" | When executing query: """ match (v1)-->(v2:player)-->(v3) where v2.player.name=="Shaquille O'Neal" - return properties(v1).name,properties(v2).age,properties(v3).name AS name,v2.player.sex,v1.player.age - order by name limit 1; + return properties(v1).name,properties(v2).age as age,properties(v3).name AS name,v2.player.sex,v1.player.age + order by age, name limit 1; """ Then the result should be, in order, with relax comparison: - | properties(v1).name | properties(v2).age | name | v2.player.sex | v1.player.age | - | "Yao Ming" | 47 | "Cavaliers" | "男" | 38 | + | properties(v1).name | age | name | v2.player.sex | v1.player.age | + | "Yao Ming" | 47 | "Cavaliers" | "男" | 38 | When executing query: """ match (v1)-->(v2:player)-->(v3) where v2.player.name=="Shaquille O'Neal" - return properties(v1).name,properties(v2).age,properties(v3).name AS name + return properties(v1).name,properties(v2).age as age,properties(v3).name AS name order by name limit 1; """ Then the result should be, in order, with relax comparison: - | properties(v1).name | properties(v2).age | name | - | "Yao Ming" | 47 | "Cavaliers" | + | properties(v1).name | age | name | + | "Yao Ming" | 47 | "Cavaliers" | When executing query: """ match (v1)-->(v2)-->(v3:team{name:"Celtics"}) - return properties(v1).name,properties(v2).age,properties(v3).name AS name - order by name limit 1; + return properties(v1).name,properties(v2).age as age,properties(v3).name AS name + order by name, age limit 1; """ Then the result should be, in order, with relax comparison: - | properties(v1).name | properties(v2).age | name | - | "Yao Ming" | 47 | "Celtics" | + | properties(v1).name | age | name | + | "Ray Allen" | 33 | "Celtics" | When executing query: """ match (src_v)-[e:like|serve]->(dst_v) where id(src_v)=="Rajon Rondo" @@ -829,16 +829,16 @@ Feature: Prune Properties rule When executing query: """ match (src_v)-[e:like|serve]->(dst_v)-[e2]-(dst_v2) where id(src_v)=="Rajon Rondo" - return properties(e).degree,properties(e2).degree AS degree - order by degree limit 5; + return properties(e).degree as degree,properties(e2).degree AS degree1 + order by degree, degree1 limit 5; """ Then the result should be, in order, with relax comparison: - | properties(e).degree | degree | - | 88 | 88 | - | __NULL__ | 88 | - | 88 | 88 | - | __NULL__ | 88 | - | 88 | 88 | + | degree | degree1 | + | 88 | 88 | + | 88 | 88 | + | 88 | 88 | + | 88 | 88 | + | 88 | 88 | When executing query: """ match (src_v)-[e:like|serve]->(dst_v)-[e2]-(dst_v2) where id(src_v)=="Rajon Rondo" return properties(e).degree1,properties(e).degree1,e2.a,dst_v.p.name,dst_v.player.sex1,properties(src_v).name2 limit 5;