Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move input rows of Traverse and AppendVertices. #4176

Merged
merged 4 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Iterator {

virtual const Row* row() const = 0;

virtual Row moveRow() = 0;

// erase range, no include last position, if last > size(), erase to the end
// position
virtual void eraseRange(size_t first, size_t last) = 0;
Expand Down Expand Up @@ -228,6 +230,11 @@ class DefaultIter final : public Iterator {
return nullptr;
}

Row moveRow() override {
DLOG(FATAL) << "This method should not be invoked";
return Row{};
}

private:
void doReset(size_t pos) override {
DCHECK((pos == 0 && size() == 0) || (pos < size()));
Expand Down Expand Up @@ -318,6 +325,10 @@ class GetNeighborsIter final : public Iterator {
return currentEdge_;
}

Row moveRow() override {
return std::move(*currentEdge_);
}

private:
void doReset(size_t pos) override {
UNUSED(pos);
Expand Down Expand Up @@ -475,6 +486,10 @@ class SequentialIter : public Iterator {

Value getEdge() const override;

Row moveRow() override {
return std::move(*iter_);
}

protected:
const Row* row() const override {
return &*iter_;
Expand All @@ -484,9 +499,6 @@ class SequentialIter : public Iterator {
friend class DataCollectExecutor;
friend class AppendVerticesExecutor;
friend class TraverseExecutor;
Row&& moveRow() {
return std::move(*iter_);
}

void doReset(size_t pos) override;

Expand Down
21 changes: 21 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,27 @@ void Executor::dropBody(const PlanNode *body) {
}
}

bool Executor::movable(const Variable *var) {
// Only support input variables of current executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the data flow and logical flow are different, such as bfsShortestPath operator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use reference count to identify it.

DCHECK(std::find(node_->inputVars().begin(), node_->inputVars().end(), DCHECK_NOTNULL(var)) !=
node_->inputVars().end());
// TODO support executor in loop
if (node()->kind() == PlanNode::Kind::kLoop) {
return false;
}
if (node()->loopLayers() != 0) {
// The lifetime of loop body is managed by Loop node
return false;
}

if (node()->kind() == PlanNode::Kind::kSelect) {
return false;
}
// Normal node
// Make sure drop happened-after count decrement
return var->userCount.load(std::memory_order_acquire) == 1;
}

Status Executor::finish(Result &&result) {
if (!FLAGS_enable_lifetime_optimize ||
node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) {
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
void drop(const PlanNode *node);
void dropBody(const PlanNode *body);

// Check whether the variable is movable, it's movable when reach end of lifetime
// This method shouldn't call after `finish` method!
bool movable(const Variable *var);

// Store the result of this executor to execution context
Status finish(Result &&result);
// Store the default result which not used for later executor
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ Status AppendVerticesExecutor::handleResp(
}

auto *src = av->src();
bool mv = movable(av->inputVars().front());
for (; inputIter->valid(); inputIter->next()) {
auto dstFound = map.find(src->eval(ctx(inputIter.get())));
if (dstFound == map.end()) {
continue;
}
Row row = *inputIter->row();
Row row = mv ? inputIter->moveRow() : *inputIter->row();
row.values.emplace_back(dstFound->second);
ds.rows.emplace_back(std::move(row));
}
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Status TraverseExecutor::buildRequestDataSet() {
auto* src = traverse_->src();
QueryExpressionContext ctx(ectx_);

bool mv = movable(traverse_->inputVars().front());
for (; iter->valid(); iter->next()) {
auto vid = src->eval(ctx(iter));
if (!SchemaUtil::isValidVid(vid, vidType)) {
Expand All @@ -56,7 +57,7 @@ Status TraverseExecutor::buildRequestDataSet() {
continue;
}
// Need copy here, Argument executor may depends on this variable.
auto prePath = *iter->row();
auto prePath = mv ? iter->moveRow() : *iter->row();
buildPath(prev, vid, std::move(prePath));
if (!uniqueSet.emplace(vid).second) {
continue;
Expand Down