Skip to content

Commit

Permalink
Merge branch 'master' into s2-index-params
Browse files Browse the repository at this point in the history
  • Loading branch information
czpmango authored Dec 21, 2021
2 parents c8f2669 + 9ea4889 commit db39393
Show file tree
Hide file tree
Showing 71 changed files with 747 additions and 234 deletions.
2 changes: 2 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,8 @@ void MetaClient::getResponse(Request req,
} else if (resp.get_code() == nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE) {
pro.setValue(respGen(std::move(resp)));
return;
} else if (resp.get_code() == nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND) {
updateLeader();
}
pro.setValue(this->handleResponse(resp));
}); // then
Expand Down
1 change: 0 additions & 1 deletion src/common/expression/VariableExpression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const Value& VariableExpression::eval(ExpressionContext& ctx) { return ctx.getVa
void VariableExpression::accept(ExprVisitor* visitor) { visitor->visit(this); }

void VariableExpression::writeTo(Encoder& encoder) const {
DCHECK(isInner_);
encoder << kind_;
encoder << var_;
}
Expand Down
6 changes: 6 additions & 0 deletions src/graph/context/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ void QueryContext::init() {
objPool_ = std::make_unique<ObjectPool>();
ep_ = std::make_unique<ExecutionPlan>();
ectx_ = std::make_unique<ExecutionContext>();
// copy parameterMap into ExecutionContext
if (rctx_) {
for (auto item : rctx_->parameterMap()) {
ectx_->setValue(std::move(item.first), std::move(item.second));
}
}
idGen_ = std::make_unique<IdGenerator>(0);
symTable_ = std::make_unique<SymbolTable>(objPool_.get());
vctx_ = std::make_unique<ValidateContext>(std::make_unique<AnonVarGenerator>(symTable_.get()));
Expand Down
4 changes: 4 additions & 0 deletions src/graph/context/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ class QueryContext {

bool isKilled() const { return killed_.load(); }

bool existParameter(const std::string& param) const {
return ectx_->exist(param) && (ectx_->getValue(param).type() != Value::Type::DATASET);
}

private:
void init();

Expand Down
2 changes: 1 addition & 1 deletion src/graph/context/QueryExpressionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class QueryExpressionContext final : public ExpressionContext {

void setVar(const std::string&, Value val) override;

QueryExpressionContext& operator()(Iterator* iter) {
QueryExpressionContext& operator()(Iterator* iter = nullptr) {
iter_ = iter;
return *this;
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ folly::Future<Status> AppendVerticesExecutor::appendVertices() {
av->exprs(),
av->dedup(),
av->orderBy(),
av->limit(),
av->limit(qctx()),
av->filter())
.via(runner())
.ensure([this, getPropsTime]() {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ folly::Future<Status> GetEdgesExecutor::getEdges() {
ge->exprs(),
ge->dedup(),
ge->orderBy(),
ge->limit(),
ge->limit(qctx()),
ge->filter())
.via(runner())
.ensure([this, getPropsTime]() {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ folly::Future<Status> GetVerticesExecutor::getVertices() {
gv->exprs(),
gv->dedup(),
gv->orderBy(),
gv->limit(),
gv->limit(qctx()),
gv->filter())
.via(runner())
.ensure([this, getPropsTime]() {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/IndexScanExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ folly::Future<Status> IndexScanExecutor::indexScan() {
lookup->isEdge(),
lookup->schemaId(),
lookup->returnColumns(),
lookup->limit())
lookup->limit(qctx_))
.via(runner())
.thenValue([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) {
addStats(rpcResp, otherStats_);
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void TraverseExecutor::getNeighbors() {
finalStep ? traverse_->dedup() : false,
finalStep ? traverse_->random() : false,
finalStep ? traverse_->orderBy() : std::vector<storage::cpp2::OrderBy>(),
finalStep ? traverse_->limit() : -1,
finalStep ? traverse_->limit(qctx()) : -1,
finalStep ? traverse_->filter() : nullptr)
.via(runner())
.thenValue([this, getNbrTime](StorageRpcResponse<GetNeighborsResponse>&& resp) mutable {
Expand Down
5 changes: 3 additions & 2 deletions src/graph/optimizer/OptimizerUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,16 @@ bool OptimizerUtils::relExprHasIndex(
}

void OptimizerUtils::copyIndexScanData(const nebula::graph::IndexScan* from,
nebula::graph::IndexScan* to) {
nebula::graph::IndexScan* to,
QueryContext* qctx) {
to->setEmptyResultSet(from->isEmptyResultSet());
to->setSpace(from->space());
to->setReturnCols(from->returnColumns());
to->setIsEdge(from->isEdge());
to->setSchemaId(from->schemaId());
to->setDedup(from->dedup());
to->setOrderBy(from->orderBy());
to->setLimit(from->limit());
to->setLimit(from->limit(qctx));
to->setFilter(from->filter() == nullptr ? nullptr : from->filter()->clone());
}

Expand Down
4 changes: 3 additions & 1 deletion src/graph/optimizer/OptimizerUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class OptimizerUtils {
const Expression* expr,
const std::vector<std::shared_ptr<nebula::meta::cpp2::IndexItem>>& indexItems);

static void copyIndexScanData(const nebula::graph::IndexScan* from, nebula::graph::IndexScan* to);
static void copyIndexScanData(const nebula::graph::IndexScan* from,
nebula::graph::IndexScan* to,
QueryContext* qctx);
};

} // namespace graph
Expand Down
2 changes: 1 addition & 1 deletion src/graph/optimizer/rule/GeoPredicateIndexScanBaseRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ StatusOr<TransformResult> GeoPredicateIndexScanBaseRule::transform(
}

auto scanNode = IndexScan::make(ctx->qctx(), nullptr);
OptimizerUtils::copyIndexScanData(scan, scanNode);
OptimizerUtils::copyIndexScanData(scan, scanNode, ctx->qctx());
scanNode->setIndexQueryContext(std::move(idxCtxs));
// TODO(jie): geo predicate's calculation is a little heavy,
// which is not suitable to push down to the storage
Expand Down
2 changes: 1 addition & 1 deletion src/graph/optimizer/rule/IndexFullScanBaseRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ StatusOr<TransformResult> IndexFullScanBaseRule::transform(OptContext* ctx,
idxCtxs.emplace_back(std::move(ictx));

auto scanNode = this->scan(ctx, scan);
OptimizerUtils::copyIndexScanData(scan, scanNode);
OptimizerUtils::copyIndexScanData(scan, scanNode, ctx->qctx());
scanNode->setOutputVar(scan->outputVar());
scanNode->setColNames(scan->colNames());
scanNode->setIndexQueryContext(std::move(idxCtxs));
Expand Down
54 changes: 34 additions & 20 deletions src/graph/optimizer/rule/IndexScanRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
#include "graph/optimizer/rule/IndexScanRule.h"

#include "common/expression/LabelAttributeExpression.h"
#include "common/expression/VariableExpression.h"
#include "graph/context/QueryExpressionContext.h"
#include "graph/optimizer/OptContext.h"
#include "graph/optimizer/OptGroup.h"
#include "graph/optimizer/OptRule.h"
#include "graph/optimizer/OptimizerUtils.h"
#include "graph/planner/plan/PlanNode.h"
#include "graph/planner/plan/Query.h"
#include "graph/util/ExpressionUtils.h"
#include "graph/util/IndexUtil.h"
#include "graph/visitor/RewriteVisitor.h"

using nebula::graph::IndexScan;
using nebula::graph::IndexUtil;
Expand Down Expand Up @@ -63,7 +67,11 @@ StatusOr<OptRule::TransformResult> IndexScanRule::transform(OptContext* ctx,
} else {
FilterItems items;
ScanKind kind;
NG_RETURN_IF_ERROR(analyzeExpression(filter, &items, &kind, isEdge(groupNode)));
// rewrite ParameterExpression to ConstantExpression
// TODO: refactor index selector logic to avoid this rewriting
auto* newFilter = graph::ExpressionUtils::rewriteParameter(filter, qctx);

NG_RETURN_IF_ERROR(analyzeExpression(newFilter, &items, &kind, isEdge(groupNode), qctx));
auto status = createIndexQueryCtx(iqctx, kind, items, qctx, groupNode);
if (!status.ok()) {
NG_RETURN_IF_ERROR(createIndexQueryCtx(iqctx, qctx, groupNode));
Expand Down Expand Up @@ -118,8 +126,9 @@ Status IndexScanRule::createSingleIQC(IndexQueryCtx& iqctx,
return Status::IndexNotFound("No valid index found");
}
auto in = static_cast<const IndexScan*>(groupNode->node());
const auto& filter = in->queryContext().begin()->get_filter();
return appendIQCtx(index, items, iqctx, filter);
auto* filter = Expression::decode(qctx->objPool(), in->queryContext().begin()->get_filter());
auto* newFilter = graph::ExpressionUtils::rewriteParameter(filter, qctx);
return appendIQCtx(index, items, iqctx, newFilter);
}

Status IndexScanRule::createMultipleIQC(IndexQueryCtx& iqctx,
Expand All @@ -144,7 +153,7 @@ size_t IndexScanRule::hintCount(const FilterItems& items) const noexcept {
Status IndexScanRule::appendIQCtx(const IndexItem& index,
const FilterItems& items,
IndexQueryCtx& iqctx,
const std::string& filter) const {
const Expression* filter) const {
auto hc = hintCount(items);
auto fields = index->get_fields();
IndexQueryContext ctx;
Expand All @@ -165,7 +174,9 @@ Status IndexScanRule::appendIQCtx(const IndexItem& index,
});
if (it != filterItems.items.end()) {
// TODO (sky) : rewrite filter expr. NE expr should be add filter expr .
ctx.set_filter(filter);
if (filter != nullptr) {
ctx.set_filter(Expression::encode(*filter));
}
break;
}
NG_RETURN_IF_ERROR(appendColHint(hints, filterItems, field));
Expand All @@ -177,7 +188,9 @@ Status IndexScanRule::appendIQCtx(const IndexItem& index,
ctx.set_index_id(index->get_index_id());
if (hc > 0) {
// TODO (sky) : rewrite expr and set filter
ctx.set_filter(filter);
if (filter != nullptr) {
ctx.set_filter(Expression::encode(*filter));
}
}
ctx.set_column_hints(std::move(hints));
iqctx.emplace_back(std::move(ctx));
Expand Down Expand Up @@ -327,10 +340,8 @@ Expression* IndexScanRule::filterExpr(const OptGroupNode* groupNode) const {
return Expression::decode(pool, qct.begin()->get_filter());
}

Status IndexScanRule::analyzeExpression(Expression* expr,
FilterItems* items,
ScanKind* kind,
bool isEdge) const {
Status IndexScanRule::analyzeExpression(
Expression* expr, FilterItems* items, ScanKind* kind, bool isEdge, QueryContext* qctx) const {
// TODO (sky) : Currently only simple logical expressions are supported,
// such as all AND or all OR expressions, example :
// where c1 > 1 and c1 < 2 and c2 == 1
Expand All @@ -352,7 +363,7 @@ Status IndexScanRule::analyzeExpression(Expression* expr,
return Status::NotSupported("Condition not support yet : %s", expr->toString().c_str());
}
for (size_t i = 0; i < lExpr->operands().size(); ++i) {
NG_RETURN_IF_ERROR(analyzeExpression(lExpr->operand(i), items, kind, isEdge));
NG_RETURN_IF_ERROR(analyzeExpression(lExpr->operand(i), items, kind, isEdge, qctx));
}
break;
}
Expand All @@ -363,8 +374,8 @@ Status IndexScanRule::analyzeExpression(Expression* expr,
case Expression::Kind::kRelGT:
case Expression::Kind::kRelNE: {
auto* rExpr = static_cast<RelationalExpression*>(expr);
auto ret = isEdge ? addFilterItem<EdgePropertyExpression>(rExpr, items)
: addFilterItem<TagPropertyExpression>(rExpr, items);
auto ret = isEdge ? addFilterItem<EdgePropertyExpression>(rExpr, items, qctx)
: addFilterItem<TagPropertyExpression>(rExpr, items, qctx);
NG_RETURN_IF_ERROR(ret);
if (kind->getKind() == ScanKind::Kind::kMultipleScan &&
expr->kind() == Expression::Kind::kRelNE) {
Expand All @@ -381,19 +392,22 @@ Status IndexScanRule::analyzeExpression(Expression* expr,
}

template <typename E, typename>
Status IndexScanRule::addFilterItem(RelationalExpression* expr, FilterItems* items) const {
Status IndexScanRule::addFilterItem(RelationalExpression* expr,
FilterItems* items,
QueryContext* qctx) const {
// TODO (sky) : Check illegal filter. for example : where c1 == 1 and c1 == 2
auto relType = std::is_same<E, EdgePropertyExpression>::value ? Expression::Kind::kEdgeProperty
: Expression::Kind::kTagProperty;
if (expr->left()->kind() == relType && expr->right()->kind() == Expression::Kind::kConstant) {
if (expr->left()->kind() == relType &&
graph::ExpressionUtils::isEvaluableExpr(expr->right(), qctx)) {
auto* l = static_cast<const E*>(expr->left());
auto* r = static_cast<ConstantExpression*>(expr->right());
items->addItem(l->prop(), expr->kind(), r->value());
} else if (expr->left()->kind() == Expression::Kind::kConstant &&
auto rValue = expr->right()->eval(graph::QueryExpressionContext(qctx->ectx())());
items->addItem(l->prop(), expr->kind(), rValue);
} else if (graph::ExpressionUtils::isEvaluableExpr(expr->left(), qctx) &&
expr->right()->kind() == relType) {
auto* r = static_cast<const E*>(expr->right());
auto* l = static_cast<ConstantExpression*>(expr->left());
items->addItem(r->prop(), IndexUtil::reverseRelationalExprKind(expr->kind()), l->value());
auto lValue = expr->left()->eval(graph::QueryExpressionContext(qctx->ectx())());
items->addItem(r->prop(), IndexUtil::reverseRelationalExprKind(expr->kind()), lValue);
} else {
return Status::Error("Optimizer error, when rewrite relational expression");
}
Expand Down
7 changes: 4 additions & 3 deletions src/graph/optimizer/rule/IndexScanRule.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class IndexScanRule final : public OptRule {
Status appendIQCtx(const IndexItem& index,
const FilterItems& items,
std::vector<graph::IndexScan::IndexQueryContext>& iqctx,
const std::string& filter = "") const;
const Expression* filter = nullptr) const;

Status appendIQCtx(const IndexItem& index,
std::vector<graph::IndexScan::IndexQueryContext>& iqctx) const;
Expand All @@ -120,12 +120,13 @@ class IndexScanRule final : public OptRule {

Expression* filterExpr(const OptGroupNode* groupNode) const;

Status analyzeExpression(Expression* expr, FilterItems* items, ScanKind* kind, bool isEdge) const;
Status analyzeExpression(
Expression* expr, FilterItems* items, ScanKind* kind, bool isEdge, QueryContext* qctx) const;

template <typename E,
typename = std::enable_if_t<std::is_same<E, EdgePropertyExpression>::value ||
std::is_same<E, TagPropertyExpression>::value>>
Status addFilterItem(RelationalExpression* expr, FilterItems* items) const;
Status addFilterItem(RelationalExpression* expr, FilterItems* items, QueryContext* qctx) const;

IndexItem findOptimalIndex(graph::QueryContext* qctx,
const OptGroupNode* groupNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ EdgeIndexScan* makeEdgeIndexScan(QueryContext* qctx, const EdgeIndexScan* scan,
} else {
scanNode = EdgeIndexRangeScan::make(qctx, nullptr, scan->edgeType());
}
OptimizerUtils::copyIndexScanData(scan, scanNode);
OptimizerUtils::copyIndexScanData(scan, scanNode, qctx);
return scanNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ TagIndexScan* makeTagIndexScan(QueryContext* qctx, const TagIndexScan* scan, boo
tagScan = TagIndexRangeScan::make(qctx, nullptr, scan->tagName());
}

OptimizerUtils::copyIndexScanData(scan, tagScan);
OptimizerUtils::copyIndexScanData(scan, tagScan, qctx);
return tagScan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ const Pattern &PushLimitDownEdgeIndexFullScanRule::pattern() const {

StatusOr<OptRule::TransformResult> PushLimitDownEdgeIndexFullScanRule::transform(
OptContext *octx, const MatchedResult &matched) const {
auto *qctx = octx->qctx();
auto limitGroupNode = matched.node;
auto indexScanGroupNode = matched.dependencies.front().node;

const auto limit = static_cast<const Limit *>(limitGroupNode->node());
const auto indexScan = static_cast<const EdgeIndexFullScan *>(indexScanGroupNode->node());

int64_t limitRows = limit->offset() + limit->count();
if (indexScan->limit() >= 0 && limitRows >= indexScan->limit()) {
int64_t limitRows = limit->offset() + limit->count(qctx);
if (indexScan->limit(qctx) >= 0 && limitRows >= indexScan->limit(qctx)) {
return TransformResult::noTransform();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ const Pattern &PushLimitDownEdgeIndexPrefixScanRule::pattern() const {

StatusOr<OptRule::TransformResult> PushLimitDownEdgeIndexPrefixScanRule::transform(
OptContext *octx, const MatchedResult &matched) const {
auto *qctx = octx->qctx();
auto limitGroupNode = matched.node;
auto indexScanGroupNode = matched.dependencies.front().node;

const auto limit = static_cast<const Limit *>(limitGroupNode->node());
const auto indexScan = static_cast<const EdgeIndexPrefixScan *>(indexScanGroupNode->node());

int64_t limitRows = limit->offset() + limit->count();
if (indexScan->limit() >= 0 && limitRows >= indexScan->limit()) {
int64_t limitRows = limit->offset() + limit->count(qctx);
if (indexScan->limit(qctx) >= 0 && limitRows >= indexScan->limit(qctx)) {
return TransformResult::noTransform();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ const Pattern &PushLimitDownEdgeIndexRangeScanRule::pattern() const {

StatusOr<OptRule::TransformResult> PushLimitDownEdgeIndexRangeScanRule::transform(
OptContext *octx, const MatchedResult &matched) const {
auto *qctx = octx->qctx();
auto limitGroupNode = matched.node;
auto indexScanGroupNode = matched.dependencies.front().node;

const auto limit = static_cast<const Limit *>(limitGroupNode->node());
const auto indexScan = static_cast<const EdgeIndexRangeScan *>(indexScanGroupNode->node());

int64_t limitRows = limit->offset() + limit->count();
if (indexScan->limit() >= 0 && limitRows >= indexScan->limit()) {
int64_t limitRows = limit->offset() + limit->count(qctx);
if (indexScan->limit(qctx) >= 0 && limitRows >= indexScan->limit(qctx)) {
return TransformResult::noTransform();
}

Expand Down
5 changes: 3 additions & 2 deletions src/graph/optimizer/rule/PushLimitDownGetNeighborsRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const Pattern &PushLimitDownGetNeighborsRule::pattern() const {

StatusOr<OptRule::TransformResult> PushLimitDownGetNeighborsRule::transform(
OptContext *octx, const MatchedResult &matched) const {
auto *qctx = octx->qctx();
auto limitGroupNode = matched.node;
auto gnGroupNode = matched.dependencies.front().node;

Expand All @@ -43,8 +44,8 @@ StatusOr<OptRule::TransformResult> PushLimitDownGetNeighborsRule::transform(
if (!graph::ExpressionUtils::isEvaluableExpr(limit->countExpr())) {
return TransformResult::noTransform();
}
int64_t limitRows = limit->offset() + limit->count();
if (gn->limit() >= 0 && limitRows >= gn->limit()) {
int64_t limitRows = limit->offset() + limit->count(qctx);
if (gn->limit(qctx) >= 0 && limitRows >= gn->limit(qctx)) {
return TransformResult::noTransform();
}

Expand Down
Loading

0 comments on commit db39393

Please sign in to comment.