Skip to content

Commit

Permalink
Merge branch 'master' into fix-AliasType
Browse files Browse the repository at this point in the history
  • Loading branch information
yixinglu authored Dec 7, 2022
2 parents 8e3e92c + d9752a0 commit 5285587
Show file tree
Hide file tree
Showing 19 changed files with 531 additions and 22 deletions.
4 changes: 4 additions & 0 deletions src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ struct Path final {
// "(v)-[:like]->()" in (v)-[:like]->()
std::string collectVariable;

// Flag for pattern predicate
bool isPred{false};
bool isAntiPred{false};

enum PathType : int8_t { kDefault, kAllShortest, kSingleShortest };
PathType pathType{PathType::kDefault};
};
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ nebula_add_library(
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
query/RollUpApplyExecutor.cpp
query/PatternApplyExecutor.cpp
query/GetDstBySrcExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
#include "graph/executor/query/LeftJoinExecutor.h"
#include "graph/executor/query/LimitExecutor.h"
#include "graph/executor/query/MinusExecutor.h"
#include "graph/executor/query/PatternApplyExecutor.h"
#include "graph/executor/query/ProjectExecutor.h"
#include "graph/executor/query/RollUpApplyExecutor.h"
#include "graph/executor/query/SampleExecutor.h"
Expand Down Expand Up @@ -542,6 +543,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kRollUpApply: {
return pool->makeAndAdd<RollUpApplyExecutor>(node, qctx);
}
case PlanNode::Kind::kPatternApply: {
return pool->makeAndAdd<PatternApplyExecutor>(node, qctx);
}
case PlanNode::Kind::kArgument: {
return pool->makeAndAdd<ArgumentExecutor>(node, qctx);
}
Expand Down
154 changes: 154 additions & 0 deletions src/graph/executor/query/PatternApplyExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/PatternApplyExecutor.h"

#include "graph/context/Iterator.h"
#include "graph/context/QueryExpressionContext.h"
#include "graph/planner/plan/Query.h"

namespace nebula {
namespace graph {

folly::Future<Status> PatternApplyExecutor::execute() {
SCOPED_TIMER(&execTime_);
return patternApply();
}

Status PatternApplyExecutor::checkBiInputDataSets() {
auto* patternApply = asNode<PatternApply>(node());
lhsIter_ = ectx_->getResult(patternApply->leftInputVar()).iter();
DCHECK(!!lhsIter_);
if (lhsIter_->isGetNeighborsIter() || lhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "PatternApply executor does not support " << lhsIter_->kind();
return Status::Error(ss.str());
}
rhsIter_ = ectx_->getResult(patternApply->rightInputVar()).iter();
DCHECK(!!rhsIter_);
if (rhsIter_->isGetNeighborsIter() || rhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "PatternApply executor does not support " << rhsIter_->kind();
return Status::Error(ss.str());
}
isAntiPred_ = patternApply->isAntiPredicate();

return Status::OK();
}

void PatternApplyExecutor::collectValidKeys(const std::vector<Expression*>& keyCols,
Iterator* iter,
std::unordered_set<List>& validKeys) const {
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
List list;
list.values.reserve(keyCols.size());
for (auto& col : keyCols) {
Value val = col->eval(ctx(iter));
list.values.emplace_back(std::move(val));
}
validKeys.emplace(std::move(list));
}
}

void PatternApplyExecutor::collectValidKey(Expression* keyCol,
Iterator* iter,
std::unordered_set<Value>& validKey) const {
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
auto& val = keyCol->eval(ctx(iter));
validKey.emplace(val);
}
}

DataSet PatternApplyExecutor::applyZeroKey(Iterator* appliedIter, const bool allValid) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
if (allValid) {
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

DataSet PatternApplyExecutor::applySingleKey(Expression* appliedKey,
Iterator* appliedIter,
const std::unordered_set<Value>& validKey) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
auto& val = appliedKey->eval(ctx(appliedIter));
bool applyFlag = (validKey.find(val) != validKey.end()) ^ isAntiPred_;
if (applyFlag) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

DataSet PatternApplyExecutor::applyMultiKey(std::vector<Expression*> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<List>& validKeys) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
List list;
list.values.reserve(appliedKeys.size());
for (auto& col : appliedKeys) {
Value val = col->eval(ctx(appliedIter));
list.values.emplace_back(std::move(val));
}

bool applyFlag = (validKeys.find(list) != validKeys.end()) ^ isAntiPred_;
if (applyFlag) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
ds.rows.emplace_back(std::move(row));
}
}
return ds;
}

folly::Future<Status> PatternApplyExecutor::patternApply() {
auto* patternApplyNode = asNode<PatternApply>(node());
NG_RETURN_IF_ERROR(checkBiInputDataSets());

DataSet result;
mv_ = movable(node()->inputVars()[0]);
auto keyCols = patternApplyNode->keyCols();
if (keyCols.size() == 0) {
// Reverse the valid flag if the pattern predicate is an anti-predicate
applyZeroKey(lhsIter_.get(), (rhsIter_->size() > 0) ^ isAntiPred_);
} else if (keyCols.size() == 1) {
std::unordered_set<Value> validKey;
collectValidKey(keyCols[0]->clone(), rhsIter_.get(), validKey);
result = applySingleKey(keyCols[0]->clone(), lhsIter_.get(), validKey);
} else {
// Copy the keyCols to refresh the inside propIndex_ cache
auto cloneExpr = [](std::vector<Expression*> exprs) {
std::vector<Expression*> applyColsCopy;
applyColsCopy.reserve(exprs.size());
for (auto& expr : exprs) {
applyColsCopy.emplace_back(expr->clone());
}
return applyColsCopy;
};

std::unordered_set<List> validKeys;
collectValidKeys(cloneExpr(keyCols), rhsIter_.get(), validKeys);
result = applyMultiKey(cloneExpr(keyCols), lhsIter_.get(), validKeys);
}

result.colNames = patternApplyNode->colNames();
return finish(ResultBuilder().value(Value(std::move(result))).build());
}

} // namespace graph
} // namespace nebula
52 changes: 52 additions & 0 deletions src/graph/executor/query/PatternApplyExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include "graph/executor/Executor.h"

namespace nebula {
namespace graph {

class PatternApplyExecutor : public Executor {
public:
PatternApplyExecutor(const PlanNode* node, QueryContext* qctx)
: Executor("PatternApplyExecutor", node, qctx) {}

folly::Future<Status> execute() override;

protected:
Status checkBiInputDataSets();

void collectValidKeys(const std::vector<Expression*>& keyCols,
Iterator* iter,
std::unordered_set<List>& validKeys) const;

void collectValidKey(Expression* keyCol,
Iterator* iter,
std::unordered_set<Value>& validKey) const;

DataSet applyZeroKey(Iterator* appliedIter, const bool allValid);

DataSet applySingleKey(Expression* appliedCol,
Iterator* appliedIter,
const std::unordered_set<Value>& validKey);

DataSet applyMultiKey(std::vector<Expression*> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<List>& validKeys);

folly::Future<Status> patternApply();
std::unique_ptr<Iterator> lhsIter_;
std::unique_ptr<Iterator> rhsIter_;

// Should apply the reverse when the pattern is an anti-predicate
bool isAntiPred_{false};
// Check if the apply side dataset movable
bool mv_{false};
};

} // namespace graph
} // namespace nebula
1 change: 1 addition & 0 deletions src/graph/optimizer/rule/RemoveNoopProjectRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const std::unordered_set<graph::PlanNode::Kind> RemoveNoopProjectRule::kQueries{
PlanNode::Kind::kHashInnerJoin,
PlanNode::Kind::kCrossJoin,
PlanNode::Kind::kRollUpApply,
PlanNode::Kind::kPatternApply,
PlanNode::Kind::kArgument,
};

Expand Down
6 changes: 5 additions & 1 deletion src/graph/planner/match/MatchPathPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ StatusOr<SubPlan> MatchPathPlanner::transform(WhereClauseContext* bindWhere,
NG_RETURN_IF_ERROR(findStarts(bindWhere, nodeAliasesSeen, startFromEdge, startIndex, subplan));
NG_RETURN_IF_ERROR(expand(startFromEdge, startIndex, subplan));

MatchSolver::buildProjectColumns(ctx_->qctx, path_, subplan);
// No need to actually build path if the path is just a predicate
if (!path_.isPred) {
MatchSolver::buildProjectColumns(ctx_->qctx, path_, subplan);
}

return subplan;
}

Expand Down
24 changes: 23 additions & 1 deletion src/graph/planner/match/SegmentsConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,34 @@ SubPlan SegmentsConnector::rollUpApply(CypherClauseContextBase* ctx,
return newPlan;
}

/*static*/ SubPlan SegmentsConnector::patternApply(CypherClauseContextBase* ctx,
const SubPlan& left,
const SubPlan& right,
const graph::Path& path) {
SubPlan newPlan = left;
auto qctx = ctx->qctx;
std::vector<Expression*> keyProps;
for (const auto& col : path.compareVariables) {
keyProps.emplace_back(FunctionCallExpression::make(
qctx->objPool(), "id", {InputPropertyExpression::make(qctx->objPool(), col)}));
}
auto* patternApply = PatternApply::make(
qctx, left.root, DCHECK_NOTNULL(right.root), std::move(keyProps), path.isAntiPred);
// Left side input may be nullptr, which will be filled later
std::vector<std::string> colNames =
left.root != nullptr ? left.root->colNames() : ctx->inputColNames;
patternApply->setColNames(std::move(colNames));
newPlan.root = patternApply;
newPlan.tail = (newPlan.tail == nullptr ? patternApply : newPlan.tail);
return newPlan;
}

SubPlan SegmentsConnector::addInput(const SubPlan& left, const SubPlan& right, bool copyColNames) {
if (left.root == nullptr) {
return right;
}
SubPlan newPlan = left;
DCHECK(left.root->isSingleInput());

if (left.tail->isSingleInput()) {
auto* mutableLeft = const_cast<PlanNode*>(left.tail);
auto* siLeft = static_cast<SingleInputNode*>(mutableLeft);
Expand Down
5 changes: 5 additions & 0 deletions src/graph/planner/match/SegmentsConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class SegmentsConnector final {
const SubPlan& right,
const graph::Path& path);

static SubPlan patternApply(CypherClauseContextBase* ctx,
const SubPlan& left,
const SubPlan& right,
const graph::Path& path);

/*
* left->right
*/
Expand Down
38 changes: 24 additions & 14 deletions src/graph/planner/match/WhereClausePlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,37 @@ StatusOr<SubPlan> WhereClausePlanner::transform(CypherClauseContextBase* ctx) {
}

auto* wctx = static_cast<WhereClauseContext*>(ctx);
SubPlan wherePlan;
if (wctx->filter) {
auto* newFilter = MatchSolver::doRewrite(wctx->qctx, wctx->aliasesAvailable, wctx->filter);
wherePlan.root = Filter::make(wctx->qctx, nullptr, newFilter, needStableFilter_);
wherePlan.tail = wherePlan.root;

SubPlan subPlan;
// Build plan for pattern from expression
SubPlan plan;
if (!wctx->paths.empty()) {
SubPlan pathsPlan;
// Build plan for pattern expression
for (auto& path : wctx->paths) {
auto status = MatchPathPlanner(wctx, path).transform(nullptr, {});
NG_RETURN_IF_ERROR(status);
subPlan = SegmentsConnector::rollUpApply(wctx, subPlan, std::move(status).value(), path);
}
if (subPlan.root != nullptr) {
wherePlan = SegmentsConnector::addInput(wherePlan, subPlan, true);
auto pathPlan = std::move(status).value();

if (path.isPred) {
// Build plan for pattern predicates
pathsPlan = SegmentsConnector::patternApply(wctx, pathsPlan, pathPlan, path);
} else {
pathsPlan = SegmentsConnector::rollUpApply(wctx, pathsPlan, pathPlan, path);
}
}
plan = pathsPlan;
}

return wherePlan;
if (wctx->filter) {
SubPlan wherePlan;
auto* newFilter = MatchSolver::doRewrite(wctx->qctx, wctx->aliasesAvailable, wctx->filter);
wherePlan.root = Filter::make(wctx->qctx, nullptr, newFilter, needStableFilter_);
wherePlan.tail = wherePlan.root;
if (plan.root == nullptr) {
return wherePlan;
}
plan = SegmentsConnector::addInput(wherePlan, plan, true);
}

return wherePlan;
return plan;
}
} // namespace graph
} // namespace nebula
2 changes: 2 additions & 0 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "Argument";
case Kind::kRollUpApply:
return "RollUpApply";
case Kind::kPatternApply:
return "PatternApply";
case Kind::kGetDstBySrc:
return "GetDstBySrc";
// no default so the compiler will warning when lack
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PlanNode {
kHashInnerJoin,
kCrossJoin,
kRollUpApply,
kPatternApply,
kArgument,

// Logic
Expand Down
Loading

0 comments on commit 5285587

Please sign in to comment.