From beed8aa238b45716d96b21366c4b4b98db6c13ba Mon Sep 17 00:00:00 2001 From: jimingquan Date: Thu, 15 Sep 2022 14:17:56 +0800 Subject: [PATCH] fix error --- src/graph/context/ExecutionContext.cpp | 14 +++++ src/graph/context/ExecutionContext.h | 2 + src/graph/context/Iterator.cpp | 6 ++- src/graph/executor/algo/SubgraphExecutor.cpp | 51 ++++++++++++++++++- src/graph/executor/algo/SubgraphExecutor.h | 4 ++ .../executor/query/DataCollectExecutor.cpp | 18 +------ .../executor/query/GetNeighborsExecutor.cpp | 1 + 7 files changed, 77 insertions(+), 19 deletions(-) diff --git a/src/graph/context/ExecutionContext.cpp b/src/graph/context/ExecutionContext.cpp index 282bb24a8e7..ecbdcb54544 100644 --- a/src/graph/context/ExecutionContext.cpp +++ b/src/graph/context/ExecutionContext.cpp @@ -75,6 +75,20 @@ const Result& ExecutionContext::getResult(const std::string& name) const { } } +void ExecutionContext::setVersionedResult(const std::string& name, + Result&& result, + int64_t version) { + auto it = valueMap_.find(name); + if (it != valueMap_.end()) { + auto& hist = it->second; + auto size = hist.size(); + if (static_cast(std::abs(version)) >= size) { + return; + } + hist[(size + version - 1) % size] = std::move(result); + } +} + const Result& ExecutionContext::getVersionedResult(const std::string& name, int64_t version) const { auto& result = getHistory(name); auto size = result.size(); diff --git a/src/graph/context/ExecutionContext.h b/src/graph/context/ExecutionContext.h index 28aa31d7be5..d34acddc6a9 100644 --- a/src/graph/context/ExecutionContext.h +++ b/src/graph/context/ExecutionContext.h @@ -51,6 +51,8 @@ class ExecutionContext { const Result& getVersionedResult(const std::string& name, int64_t version) const; + void setVersionedResult(const std::string& name, Result&& result, int64_t version); + size_t numVersions(const std::string& name) const; // Return all existing history of the value. The front is the latest value diff --git a/src/graph/context/Iterator.cpp b/src/graph/context/Iterator.cpp index fc369192a69..b7b7b34f4e3 100644 --- a/src/graph/context/Iterator.cpp +++ b/src/graph/context/Iterator.cpp @@ -5,6 +5,8 @@ #include "graph/context/Iterator.h" +#include + #include "common/datatypes/Edge.h" #include "common/datatypes/Vertex.h" #include "common/memory/MemoryUtils.h" @@ -478,7 +480,6 @@ List GetNeighborsIter::getVertices() { for (currentRow_ = currentDs_->ds->rows.begin(); currentRow_ < currentDs_->ds->rows.end(); ++currentRow_) { vertices.values.emplace_back(getVertex()); - VLOG(1) << "vertex: " << getVertex() << " size: " << vertices.size(); } } reset(); @@ -549,8 +550,9 @@ List GetNeighborsIter::getEdges() { auto edge = getEdge(); if (edge.isEdge()) { const_cast(edge.getEdge()).format(); + DLOG(ERROR) << "dc edge: " << edge.toString(); + edges.values.emplace_back(std::move(edge)); } - edges.values.emplace_back(std::move(edge)); } reset(); return edges; diff --git a/src/graph/executor/algo/SubgraphExecutor.cpp b/src/graph/executor/algo/SubgraphExecutor.cpp index 15363d47aaa..e9bbfb014e5 100644 --- a/src/graph/executor/algo/SubgraphExecutor.cpp +++ b/src/graph/executor/algo/SubgraphExecutor.cpp @@ -51,7 +51,7 @@ folly::Future SubgraphExecutor::getNeighbors() { .via(runner()) .thenValue([this, getNbrTime](RpcResponse&& resp) mutable { // addStats(resp, getNbrTime.elapsedInUSec()); - vids_.clear(); + // vids_.clear(); return handleResponse(std::move(resp)); }); } @@ -71,6 +71,22 @@ folly::Future SubgraphExecutor::handleResponse(RpcResponse&& resps) { } list.values.emplace_back(std::move(*dataset)); } + + List vids(vids_); + DLOG(ERROR) << " step is " << currentStep_; + if (subgraph_->edgeFilter()) { + DLOG(ERROR) << "subgraph edge filter: " << subgraph_->edgeFilter()->toString(); + } else { + DLOG(ERROR) << "subgraph edge filter is nullptr"; + } + if (subgraph_->filter()) { + DLOG(ERROR) << "subgraph filter: " << subgraph_->filter()->toString(); + } else { + DLOG(ERROR) << " subgraph filter is nullptr"; + } + DLOG(ERROR) << "vids is " << vids.toString(); + DLOG(ERROR) << "getNeightbor result: " << list.toString(); + auto listVal = std::make_shared(std::move(list)); auto iter = std::make_unique(listVal); @@ -80,12 +96,36 @@ folly::Future SubgraphExecutor::handleResponse(RpcResponse&& resps) { } if (!process(std::move(iter)) || ++currentStep_ > steps) { + filterEdges(0); return folly::makeFuture(Status::OK()); } else { return getNeighbors(); } } +void SubgraphExecutor::filterEdges(int version) { + auto iter = ectx_->getVersionedResult(subgraph_->outputVar(), version).iter(); + auto* gnIter = static_cast(iter.get()); + while (gnIter->valid()) { + const auto& dst = gnIter->getEdgeProp("*", nebula::kDst); + if (validVids_.find(dst) == validVids_.end()) { + auto edge = gnIter->getEdge(); + gnIter->erase(); + DLOG(ERROR) << "erase dst " << dst.toString() << " edge is : " << edge.toString(); + } else { + gnIter->next(); + } + } + gnIter->reset(); + ResultBuilder builder; + builder.iter(std::move(iter)); + ectx_->setVersionedResult(subgraph_->outputVar(), builder.build(), version); + + std::vector vids(validVids_.begin(), validVids_.end()); + List list(vids); + DLOG(ERROR) << "pre Dst is : " << list; +} + bool SubgraphExecutor::process(std::unique_ptr iter) { auto gnSize = iter->size(); if (gnSize == 0) { @@ -105,9 +145,15 @@ bool SubgraphExecutor::process(std::unique_ptr iter) { } iter->reset(); } + vids_.clear(); auto& biDirectEdgeTypes = subgraph_->biDirectEdgeTypes(); while (iter->valid()) { const auto& dst = iter->getEdgeProp("*", nebula::kDst); + validVids_.emplace(iter->getColumn(0)); + // if (currentStep_ != 1 && preDsts.find(dst) == preDsts.end()) { + // iter->erase(); + // continue; + // } auto findIter = historyVids_.find(dst); if (findIter != historyVids_.end()) { if (biDirectEdgeTypes.empty()) { @@ -147,6 +193,9 @@ bool SubgraphExecutor::process(std::unique_ptr iter) { // update historyVids historyVids_.insert(std::make_move_iterator(currentVids.begin()), std::make_move_iterator(currentVids.end())); + if (currentStep_ != 1) { + filterEdges(-1); + } if (vids_.empty()) { return false; } diff --git a/src/graph/executor/algo/SubgraphExecutor.h b/src/graph/executor/algo/SubgraphExecutor.h index 62808e76849..1766ef64023 100644 --- a/src/graph/executor/algo/SubgraphExecutor.h +++ b/src/graph/executor/algo/SubgraphExecutor.h @@ -45,6 +45,7 @@ using RpcResponse = storage::StorageRpcResponse>; + using HashSet = robin_hood::unordered_flat_set>; SubgraphExecutor(const PlanNode* node, QueryContext* qctx) : StorageAccessExecutor("SubgraphExecutor", node, qctx) { @@ -57,6 +58,8 @@ class SubgraphExecutor : public StorageAccessExecutor { bool process(std::unique_ptr iter); + void filterEdges(int version); + folly::Future handleResponse(RpcResponse&& resps); private: @@ -65,6 +68,7 @@ class SubgraphExecutor : public StorageAccessExecutor { size_t currentStep_{1}; size_t totalSteps_{1}; std::vector vids_; + HashSet validVids_; }; } // namespace graph diff --git a/src/graph/executor/query/DataCollectExecutor.cpp b/src/graph/executor/query/DataCollectExecutor.cpp index b00efd2c0d1..c4440741860 100644 --- a/src/graph/executor/query/DataCollectExecutor.cpp +++ b/src/graph/executor/query/DataCollectExecutor.cpp @@ -72,27 +72,13 @@ Status DataCollectExecutor::collectSubgraph(const std::vector& vars bool notEmpty = false; for (const auto& type : colType) { if (type == Value::Type::VERTEX) { - auto originVertices = gnIter->getVertices(); - vertices.reserve(originVertices.size()); - for (auto& v : originVertices.values) { - if (UNLIKELY(!v.isVertex())) { - continue; - } - vertices.emplace_back(std::move(v)); - } + vertices = gnIter->getVertices(); if (!vertices.empty()) { notEmpty = true; row.emplace_back(std::move(vertices)); } } else { - auto originEdges = gnIter->getEdges(); - edges.reserve(originEdges.size()); - for (auto& edge : originEdges.values) { - if (UNLIKELY(!edge.isEdge())) { - continue; - } - edges.emplace_back(std::move(edge)); - } + edges = gnIter->getEdges(); if (!edges.empty()) { notEmpty = true; } diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index 986dcc9152e..40a2f6f2e2a 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -97,6 +97,7 @@ Status GetNeighborsExecutor::handleResponse(RpcResponse& resps) { list.values.emplace_back(std::move(*dataset)); } + DLOG(ERROR) << "getNeightbor result : " << list.toString(); builder.value(Value(std::move(list))).iter(Iterator::Kind::kGetNeighbors); return finish(builder.build()); }