Skip to content

Commit

Permalink
Move input rows of Traverse and AppendVertices.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shylock-Hg committed Apr 19, 2022
1 parent d6df28c commit 7fd1d38
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 5 deletions.
18 changes: 15 additions & 3 deletions src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Iterator {

virtual const Row* row() const = 0;

virtual Row moveRow() = 0;

// erase range, no include last position, if last > size(), erase to the end
// position
virtual void eraseRange(size_t first, size_t last) = 0;
Expand Down Expand Up @@ -228,6 +230,11 @@ class DefaultIter final : public Iterator {
return nullptr;
}

Row moveRow() override {
DLOG(FATAL) << "This method should not be invoked";
return Row{};
}

private:
void doReset(size_t pos) override {
DCHECK((pos == 0 && size() == 0) || (pos < size()));
Expand Down Expand Up @@ -318,6 +325,10 @@ class GetNeighborsIter final : public Iterator {
return currentEdge_;
}

Row moveRow() override {
return std::move(*currentEdge_);
}

private:
void doReset(size_t pos) override {
UNUSED(pos);
Expand Down Expand Up @@ -475,6 +486,10 @@ class SequentialIter : public Iterator {

Value getEdge() const override;

Row moveRow() override {
return std::move(*iter_);
}

protected:
const Row* row() const override {
return &*iter_;
Expand All @@ -484,9 +499,6 @@ class SequentialIter : public Iterator {
friend class DataCollectExecutor;
friend class AppendVerticesExecutor;
friend class TraverseExecutor;
Row&& moveRow() {
return std::move(*iter_);
}

void doReset(size_t pos) override;

Expand Down
21 changes: 21 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,27 @@ void Executor::dropBody(const PlanNode *body) {
}
}

bool Executor::movable(const Variable *var) {
// Only support input variables of current executor
DCHECK(std::find(node_->inputVars().begin(), node_->inputVars().end(), DCHECK_NOTNULL(var)) !=
node_->inputVars().end());
// TODO support executor in loop
if (node()->kind() == PlanNode::Kind::kLoop) {
return false;
}
if (node()->loopLayers() != 0) {
// The lifetime of loop body is managed by Loop node
return false;
}

if (node()->kind() == PlanNode::Kind::kSelect) {
return false;
}
// Normal node
// Make sure drop happened-after count decrement
return var->userCount.load(std::memory_order_acquire) == 1;
}

Status Executor::finish(Result &&result) {
if (!FLAGS_enable_lifetime_optimize ||
node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) {
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
void drop(const PlanNode *node);
void dropBody(const PlanNode *body);

// Check whether the variable is movable, it's movable when reach end of lifetime
// This method shouldn't call after `finish` method!
bool movable(const Variable *var);

// Store the result of this executor to execution context
Status finish(Result &&result);
// Store the default result which not used for later executor
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ Status AppendVerticesExecutor::handleResp(
}

auto *src = av->src();
bool mv = movable(av->inputVars().front());
for (; inputIter->valid(); inputIter->next()) {
auto dstFound = map.find(src->eval(ctx(inputIter.get())));
if (dstFound == map.end()) {
continue;
}
Row row = *inputIter->row();
Row row = mv ? inputIter->moveRow() : *inputIter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Status TraverseExecutor::buildRequestDataSet() {
auto* src = traverse_->src();
QueryExpressionContext ctx(ectx_);

bool mv = movable(traverse_->inputVars().front());
for (; iter->valid(); iter->next()) {
auto vid = src->eval(ctx(iter));
if (!SchemaUtil::isValidVid(vid, vidType)) {
Expand All @@ -56,7 +57,7 @@ Status TraverseExecutor::buildRequestDataSet() {
continue;
}
// Need copy here, Argument executor may depends on this variable.
auto prePath = *iter->row();
auto prePath = mv ? iter->moveRow() : *iter->row();
buildPath(prev, vid, std::move(prePath));
if (!uniqueSet.emplace(vid).second) {
continue;
Expand Down

0 comments on commit 7fd1d38

Please sign in to comment.