From 4cf2b8507401c7146cb16b1292f575392219dda1 Mon Sep 17 00:00:00 2001 From: shylock <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 20 May 2022 17:07:35 +0800 Subject: [PATCH] Move input rows of Traverse and AppendVertices. (#4176) Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/graph/context/Iterator.h | 18 +++++++++++++--- src/graph/executor/Executor.cpp | 21 +++++++++++++++++++ src/graph/executor/Executor.h | 4 ++++ .../executor/query/AppendVerticesExecutor.cpp | 3 ++- src/graph/executor/query/TraverseExecutor.cpp | 3 ++- 5 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/graph/context/Iterator.h b/src/graph/context/Iterator.h index 8a3a379255a..bde9341ea45 100644 --- a/src/graph/context/Iterator.h +++ b/src/graph/context/Iterator.h @@ -73,6 +73,8 @@ class Iterator { virtual const Row* row() const = 0; + virtual Row moveRow() = 0; + // erase range, no include last position, if last > size(), erase to the end // position virtual void eraseRange(size_t first, size_t last) = 0; @@ -228,6 +230,11 @@ class DefaultIter final : public Iterator { return nullptr; } + Row moveRow() override { + DLOG(FATAL) << "This method should not be invoked"; + return Row{}; + } + private: void doReset(size_t pos) override { DCHECK((pos == 0 && size() == 0) || (pos < size())); @@ -318,6 +325,10 @@ class GetNeighborsIter final : public Iterator { return currentEdge_; } + Row moveRow() override { + return std::move(*currentEdge_); + } + private: void doReset(size_t pos) override { UNUSED(pos); @@ -475,6 +486,10 @@ class SequentialIter : public Iterator { Value getEdge() const override; + Row moveRow() override { + return std::move(*iter_); + } + protected: const Row* row() const override { return &*iter_; @@ -484,9 +499,6 @@ class SequentialIter : public Iterator { friend class DataCollectExecutor; friend class AppendVerticesExecutor; friend class TraverseExecutor; - Row&& moveRow() { - return std::move(*iter_); - } void doReset(size_t pos) override; diff --git a/src/graph/executor/Executor.cpp b/src/graph/executor/Executor.cpp index 2bb80673e3c..5a7428aa06a 100644 --- a/src/graph/executor/Executor.cpp +++ b/src/graph/executor/Executor.cpp @@ -682,6 +682,27 @@ void Executor::dropBody(const PlanNode *body) { } } +bool Executor::movable(const Variable *var) { + // Only support input variables of current executor + DCHECK(std::find(node_->inputVars().begin(), node_->inputVars().end(), DCHECK_NOTNULL(var)) != + node_->inputVars().end()); + // TODO support executor in loop + if (node()->kind() == PlanNode::Kind::kLoop) { + return false; + } + if (node()->loopLayers() != 0) { + // The lifetime of loop body is managed by Loop node + return false; + } + + if (node()->kind() == PlanNode::Kind::kSelect) { + return false; + } + // Normal node + // Make sure drop happened-after count decrement + return var->userCount.load(std::memory_order_acquire) == 1; +} + Status Executor::finish(Result &&result) { if (!FLAGS_enable_lifetime_optimize || node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) { diff --git a/src/graph/executor/Executor.h b/src/graph/executor/Executor.h index 6e9b932f86b..a2ba45888ac 100644 --- a/src/graph/executor/Executor.h +++ b/src/graph/executor/Executor.h @@ -94,6 +94,10 @@ class Executor : private boost::noncopyable, private cpp::NonMovable { void drop(const PlanNode *node); void dropBody(const PlanNode *body); + // Check whether the variable is movable, it's movable when reach end of lifetime + // This method shouldn't call after `finish` method! + bool movable(const Variable *var); + // Store the result of this executor to execution context Status finish(Result &&result); // Store the default result which not used for later executor diff --git a/src/graph/executor/query/AppendVerticesExecutor.cpp b/src/graph/executor/query/AppendVerticesExecutor.cpp index a7aa369ded4..d30c82eabe7 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.cpp +++ b/src/graph/executor/query/AppendVerticesExecutor.cpp @@ -101,12 +101,13 @@ Status AppendVerticesExecutor::handleResp( } auto *src = av->src(); + bool mv = movable(av->inputVars().front()); for (; inputIter->valid(); inputIter->next()) { auto dstFound = map.find(src->eval(ctx(inputIter.get()))); if (dstFound == map.end()) { continue; } - Row row = *inputIter->row(); + Row row = mv ? inputIter->moveRow() : *inputIter->row(); row.values.emplace_back(dstFound->second); ds.rows.emplace_back(std::move(row)); } diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 0210a5ce6c5..c6cadc3edf7 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -48,6 +48,7 @@ Status TraverseExecutor::buildRequestDataSet() { auto* src = traverse_->src(); QueryExpressionContext ctx(ectx_); + bool mv = movable(traverse_->inputVars().front()); for (; iter->valid(); iter->next()) { auto vid = src->eval(ctx(iter)); if (!SchemaUtil::isValidVid(vid, vidType)) { @@ -56,7 +57,7 @@ Status TraverseExecutor::buildRequestDataSet() { continue; } // Need copy here, Argument executor may depends on this variable. - auto prePath = *iter->row(); + auto prePath = mv ? iter->moveRow() : *iter->row(); buildPath(prev, vid, std::move(prePath)); if (!uniqueSet.emplace(vid).second) { continue;