Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement/pattern expression #4916

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ struct Path final {
// "(v)-[:like]->()" in (v)-[:like]->()
std::string collectVariable;

// Flag for pattern predicate
bool isPred{false};
bool isAntiPred{false};

enum PathType : int8_t { kDefault, kAllShortest, kSingleShortest };
PathType pathType{PathType::kDefault};
};
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ nebula_add_library(
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
query/RollUpApplyExecutor.cpp
query/PatternApplyExecutor.cpp
query/GetDstBySrcExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
#include "graph/executor/query/LeftJoinExecutor.h"
#include "graph/executor/query/LimitExecutor.h"
#include "graph/executor/query/MinusExecutor.h"
#include "graph/executor/query/PatternApplyExecutor.h"
#include "graph/executor/query/ProjectExecutor.h"
#include "graph/executor/query/RollUpApplyExecutor.h"
#include "graph/executor/query/SampleExecutor.h"
Expand Down Expand Up @@ -542,6 +543,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kRollUpApply: {
return pool->makeAndAdd<RollUpApplyExecutor>(node, qctx);
}
case PlanNode::Kind::kPatternApply: {
return pool->makeAndAdd<PatternApplyExecutor>(node, qctx);
}
case PlanNode::Kind::kArgument: {
return pool->makeAndAdd<ArgumentExecutor>(node, qctx);
}
Expand Down
154 changes: 154 additions & 0 deletions src/graph/executor/query/PatternApplyExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/PatternApplyExecutor.h"

#include "graph/context/Iterator.h"
#include "graph/context/QueryExpressionContext.h"
#include "graph/planner/plan/Query.h"

namespace nebula {
namespace graph {

folly::Future<Status> PatternApplyExecutor::execute() {
SCOPED_TIMER(&execTime_);
return patternApply();
}

Status PatternApplyExecutor::checkBiInputDataSets() {
auto* patternApply = asNode<PatternApply>(node());
lhsIter_ = ectx_->getResult(patternApply->leftInputVar()).iter();
DCHECK(!!lhsIter_);
if (lhsIter_->isGetNeighborsIter() || lhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "PatternApply executor does not support " << lhsIter_->kind();
return Status::Error(ss.str());
}
rhsIter_ = ectx_->getResult(patternApply->rightInputVar()).iter();
DCHECK(!!rhsIter_);
if (rhsIter_->isGetNeighborsIter() || rhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "PatternApply executor does not support " << rhsIter_->kind();
return Status::Error(ss.str());
}
isAntiPred_ = patternApply->isAntiPredicate();

return Status::OK();
}

void PatternApplyExecutor::collectValidKeys(const std::vector<Expression*>& keyCols,
Iterator* iter,
std::unordered_set<List>& validKeys) const {
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
List list;
list.values.reserve(keyCols.size());
for (auto& col : keyCols) {
Value val = col->eval(ctx(iter));
list.values.emplace_back(std::move(val));
}
validKeys.emplace(std::move(list));
}
}

void PatternApplyExecutor::collectValidKey(Expression* keyCol,
Iterator* iter,
std::unordered_set<Value>& validKey) const {
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
auto& val = keyCol->eval(ctx(iter));
validKey.emplace(val);
}
}

DataSet PatternApplyExecutor::applyZeroKey(Iterator* appliedIter, const bool allValid) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
if (allValid) {
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

DataSet PatternApplyExecutor::applySingleKey(Expression* appliedKey,
Iterator* appliedIter,
const std::unordered_set<Value>& validKey) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
auto& val = appliedKey->eval(ctx(appliedIter));
bool applyFlag = (validKey.find(val) != validKey.end()) ^ isAntiPred_;
if (applyFlag) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

DataSet PatternApplyExecutor::applyMultiKey(std::vector<Expression*> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<List>& validKeys) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
List list;
list.values.reserve(appliedKeys.size());
for (auto& col : appliedKeys) {
Value val = col->eval(ctx(appliedIter));
list.values.emplace_back(std::move(val));
}

bool applyFlag = (validKeys.find(list) != validKeys.end()) ^ isAntiPred_;
if (applyFlag) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

folly::Future<Status> PatternApplyExecutor::patternApply() {
auto* patternApplyNode = asNode<PatternApply>(node());
NG_RETURN_IF_ERROR(checkBiInputDataSets());

DataSet result;
mv_ = movable(node()->inputVars()[0]);
auto keyCols = patternApplyNode->keyCols();
if (keyCols.size() == 0) {
// Reverse the valid flag if the pattern predicate is an anti-predicate
applyZeroKey(lhsIter_.get(), (rhsIter_->size() > 0) ^ isAntiPred_);
} else if (keyCols.size() == 1) {
std::unordered_set<Value> validKey;
collectValidKey(keyCols[0]->clone(), rhsIter_.get(), validKey);
result = applySingleKey(keyCols[0]->clone(), lhsIter_.get(), validKey);
} else {
// Copy the keyCols to refresh the inside propIndex_ cache
auto cloneExpr = [](std::vector<Expression*> exprs) {
std::vector<Expression*> applyColsCopy;
applyColsCopy.reserve(exprs.size());
for (auto& expr : exprs) {
applyColsCopy.emplace_back(expr->clone());
}
return applyColsCopy;
};

std::unordered_set<List> validKeys;
collectValidKeys(cloneExpr(keyCols), rhsIter_.get(), validKeys);
result = applyMultiKey(cloneExpr(keyCols), lhsIter_.get(), validKeys);
}

result.colNames = patternApplyNode->colNames();
return finish(ResultBuilder().value(Value(std::move(result))).build());
}

} // namespace graph
} // namespace nebula
52 changes: 52 additions & 0 deletions src/graph/executor/query/PatternApplyExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include "graph/executor/Executor.h"

namespace nebula {
namespace graph {

class PatternApplyExecutor : public Executor {
public:
PatternApplyExecutor(const PlanNode* node, QueryContext* qctx)
: Executor("PatternApplyExecutor", node, qctx) {}

folly::Future<Status> execute() override;

protected:
Status checkBiInputDataSets();

void collectValidKeys(const std::vector<Expression*>& keyCols,
Iterator* iter,
std::unordered_set<List>& validKeys) const;

void collectValidKey(Expression* keyCol,
Iterator* iter,
std::unordered_set<Value>& validKey) const;

DataSet applyZeroKey(Iterator* appliedIter, const bool allValid);

DataSet applySingleKey(Expression* appliedCol,
Iterator* appliedIter,
const std::unordered_set<Value>& validKey);

DataSet applyMultiKey(std::vector<Expression*> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<List>& validKeys);

folly::Future<Status> patternApply();
std::unique_ptr<Iterator> lhsIter_;
std::unique_ptr<Iterator> rhsIter_;

// Should apply the reverse when the pattern is an anti-predicate
bool isAntiPred_{false};
// Check if the apply side dataset movable
bool mv_{false};
};

} // namespace graph
} // namespace nebula
1 change: 1 addition & 0 deletions src/graph/optimizer/rule/RemoveNoopProjectRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const std::unordered_set<graph::PlanNode::Kind> RemoveNoopProjectRule::kQueries{
PlanNode::Kind::kHashInnerJoin,
PlanNode::Kind::kCrossJoin,
PlanNode::Kind::kRollUpApply,
PlanNode::Kind::kPatternApply,
PlanNode::Kind::kArgument,
};

Expand Down
6 changes: 5 additions & 1 deletion src/graph/planner/match/MatchPathPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ StatusOr<SubPlan> MatchPathPlanner::transform(WhereClauseContext* bindWhere,
NG_RETURN_IF_ERROR(findStarts(bindWhere, nodeAliasesSeen, startFromEdge, startIndex, subplan));
NG_RETURN_IF_ERROR(expand(startFromEdge, startIndex, subplan));

MatchSolver::buildProjectColumns(ctx_->qctx, path_, subplan);
// No need to actually build path if the path is just a predicate
if (!path_.isPred) {
MatchSolver::buildProjectColumns(ctx_->qctx, path_, subplan);
}

return subplan;
}

Expand Down
24 changes: 23 additions & 1 deletion src/graph/planner/match/SegmentsConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,34 @@ SubPlan SegmentsConnector::rollUpApply(CypherClauseContextBase* ctx,
return newPlan;
}

/*static*/ SubPlan SegmentsConnector::patternApply(CypherClauseContextBase* ctx,
const SubPlan& left,
const SubPlan& right,
const graph::Path& path) {
SubPlan newPlan = left;
auto qctx = ctx->qctx;
std::vector<Expression*> keyProps;
for (const auto& col : path.compareVariables) {
keyProps.emplace_back(FunctionCallExpression::make(
qctx->objPool(), "id", {InputPropertyExpression::make(qctx->objPool(), col)}));
}
auto* patternApply = PatternApply::make(
qctx, left.root, DCHECK_NOTNULL(right.root), std::move(keyProps), path.isAntiPred);
// Left side input may be nullptr, which will be filled later
std::vector<std::string> colNames =
left.root != nullptr ? left.root->colNames() : ctx->inputColNames;
patternApply->setColNames(std::move(colNames));
newPlan.root = patternApply;
newPlan.tail = (newPlan.tail == nullptr ? patternApply : newPlan.tail);
return newPlan;
}

SubPlan SegmentsConnector::addInput(const SubPlan& left, const SubPlan& right, bool copyColNames) {
if (left.root == nullptr) {
return right;
}
SubPlan newPlan = left;
DCHECK(left.root->isSingleInput());

if (left.tail->isSingleInput()) {
auto* mutableLeft = const_cast<PlanNode*>(left.tail);
auto* siLeft = static_cast<SingleInputNode*>(mutableLeft);
Expand Down
5 changes: 5 additions & 0 deletions src/graph/planner/match/SegmentsConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class SegmentsConnector final {
const SubPlan& right,
const graph::Path& path);

static SubPlan patternApply(CypherClauseContextBase* ctx,
const SubPlan& left,
const SubPlan& right,
const graph::Path& path);

/*
* left->right
*/
Expand Down
38 changes: 24 additions & 14 deletions src/graph/planner/match/WhereClausePlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,37 @@ StatusOr<SubPlan> WhereClausePlanner::transform(CypherClauseContextBase* ctx) {
}

auto* wctx = static_cast<WhereClauseContext*>(ctx);
SubPlan wherePlan;
if (wctx->filter) {
auto* newFilter = MatchSolver::doRewrite(wctx->qctx, wctx->aliasesAvailable, wctx->filter);
wherePlan.root = Filter::make(wctx->qctx, nullptr, newFilter, needStableFilter_);
wherePlan.tail = wherePlan.root;

SubPlan subPlan;
// Build plan for pattern from expression
SubPlan plan;
if (!wctx->paths.empty()) {
SubPlan pathsPlan;
// Build plan for pattern expression
for (auto& path : wctx->paths) {
auto status = MatchPathPlanner(wctx, path).transform(nullptr, {});
NG_RETURN_IF_ERROR(status);
subPlan = SegmentsConnector::rollUpApply(wctx, subPlan, std::move(status).value(), path);
}
if (subPlan.root != nullptr) {
wherePlan = SegmentsConnector::addInput(wherePlan, subPlan, true);
auto pathPlan = std::move(status).value();

if (path.isPred) {
// Build plan for pattern predicates
pathsPlan = SegmentsConnector::patternApply(wctx, pathsPlan, pathPlan, path);
} else {
pathsPlan = SegmentsConnector::rollUpApply(wctx, pathsPlan, pathPlan, path);
}
}
plan = pathsPlan;
}

return wherePlan;
if (wctx->filter) {
SubPlan wherePlan;
auto* newFilter = MatchSolver::doRewrite(wctx->qctx, wctx->aliasesAvailable, wctx->filter);
wherePlan.root = Filter::make(wctx->qctx, nullptr, newFilter, needStableFilter_);
wherePlan.tail = wherePlan.root;
if (plan.root == nullptr) {
return wherePlan;
}
plan = SegmentsConnector::addInput(wherePlan, plan, true);
}

return wherePlan;
return plan;
}
} // namespace graph
} // namespace nebula
2 changes: 2 additions & 0 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "Argument";
case Kind::kRollUpApply:
return "RollUpApply";
case Kind::kPatternApply:
return "PatternApply";
case Kind::kGetDstBySrc:
return "GetDstBySrc";
// no default so the compiler will warning when lack
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PlanNode {
kHashInnerJoin,
kCrossJoin,
kRollUpApply,
kPatternApply,
kArgument,

// Logic
Expand Down
Loading