Skip to content

Commit

Permalink
limit bug fix for indexScan
Browse files Browse the repository at this point in the history
  • Loading branch information
bright-starry-sky committed Sep 18, 2021
1 parent 07b5470 commit 1b1848e
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 65 deletions.
10 changes: 5 additions & 5 deletions src/storage/exec/IndexEdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class IndexEdgeNode final : public RelNode<T> {
IndexScanNode<T>* indexScanNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
const std::string& schemaName,
int64_t limit)
int64_t limit = -1)
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
Expand All @@ -42,11 +42,7 @@ class IndexEdgeNode final : public RelNode<T> {
data_.clear();
std::vector<storage::cpp2::EdgeKey> edges;
auto* iter = static_cast<EdgeIndexIterator*>(indexScanNode_->iterator());
int64_t count = 0;
while (iter && iter->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
Expand All @@ -66,6 +62,7 @@ class IndexEdgeNode final : public RelNode<T> {
edges.emplace_back(std::move(edge));
iter->next();
}
int64_t count = 1;
for (const auto& edge : edges) {
auto key = NebulaKeyUtils::edgeKey(context_->vIdLen(),
partId,
Expand All @@ -82,6 +79,9 @@ class IndexEdgeNode final : public RelNode<T> {
} else {
return ret;
}
if (limit_ > 0 && ++count > limit_) {
break;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand Down
39 changes: 29 additions & 10 deletions src/storage/exec/IndexFilterNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ class IndexFilterNode final : public RelNode<T> {
// data anymore.
IndexFilterNode(RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
StorageExpressionContext* exprCtx = nullptr,
Expression* exp = nullptr,
bool isEdge = false)
StorageExpressionContext* exprCtx,
Expression* exp,
bool isEdge,
int64_t limit = -1)
: context_(context),
indexScanNode_(indexScanNode),
exprCtx_(exprCtx),
filterExp_(exp),
isEdge_(isEdge) {
isEdge_(isEdge),
limit_(limit) {
evalExprByIndex_ = true;
RelNode<T>::name_ = "IndexFilterNode";
}
Expand All @@ -42,9 +44,14 @@ class IndexFilterNode final : public RelNode<T> {
// need to read data.
IndexFilterNode(RuntimeContext* context,
IndexEdgeNode<T>* indexEdgeNode,
StorageExpressionContext* exprCtx = nullptr,
Expression* exp = nullptr)
: context_(context), indexEdgeNode_(indexEdgeNode), exprCtx_(exprCtx), filterExp_(exp) {
StorageExpressionContext* exprCtx,
Expression* exp,
int64_t limit = -1)
: context_(context),
indexEdgeNode_(indexEdgeNode),
exprCtx_(exprCtx),
filterExp_(exp),
limit_(limit) {
evalExprByIndex_ = false;
isEdge_ = true;
}
Expand All @@ -53,9 +60,14 @@ class IndexFilterNode final : public RelNode<T> {
// need to read data.
IndexFilterNode(RuntimeContext* context,
IndexVertexNode<T>* indexVertexNode,
StorageExpressionContext* exprCtx = nullptr,
Expression* exp = nullptr)
: context_(context), indexVertexNode_(indexVertexNode), exprCtx_(exprCtx), filterExp_(exp) {
StorageExpressionContext* exprCtx,
Expression* exp,
int64_t limit = -1)
: context_(context),
indexVertexNode_(indexVertexNode),
exprCtx_(exprCtx),
filterExp_(exp),
limit_(limit) {
evalExprByIndex_ = false;
isEdge_ = false;
}
Expand All @@ -74,13 +86,15 @@ class IndexFilterNode final : public RelNode<T> {
} else {
data = indexVertexNode_->moveData();
}
int64_t count = 1;
for (const auto& k : data) {
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
if (evalExprByIndex_) {
if (check(k.first)) {
data_.emplace_back(k.first, k.second);
count++;
}
} else {
const auto& schemas =
Expand All @@ -91,8 +105,12 @@ class IndexFilterNode final : public RelNode<T> {
}
if (check(reader.get(), k.first)) {
data_.emplace_back(k.first, k.second);
count++;
}
}
if (limit_ > 0 && count > limit_) {
break;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand Down Expand Up @@ -143,6 +161,7 @@ class IndexFilterNode final : public RelNode<T> {
Expression* filterExp_;
bool isEdge_;
bool evalExprByIndex_;
int64_t limit_;
std::vector<kvstore::KV> data_{};
};

Expand Down
10 changes: 5 additions & 5 deletions src/storage/exec/IndexScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class IndexScanNode : public RelNode<T> {
IndexScanNode(RuntimeContext* context,
IndexID indexId,
std::vector<cpp2::IndexColumnHint> columnHints,
int64_t limit)
int64_t limit = -1)
: context_(context), indexId_(indexId), columnHints_(std::move(columnHints)), limit_(limit) {
/**
* columnHints's elements are {scanType = PREFIX|RANGE; beginStr; endStr},
Expand Down Expand Up @@ -72,11 +72,8 @@ class IndexScanNode : public RelNode<T> {
auto* sh = context_->isEdge() ? context_->edgeSchema_ : context_->tagSchema_;
auto ttlProp = CommonUtils::ttlProps(sh);
data_.clear();
int64_t count = 0;
int64_t count = 1;
while (!!iter_ && iter_->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return {};
}
Expand All @@ -89,6 +86,9 @@ class IndexScanNode : public RelNode<T> {
}
}
data_.emplace_back(iter_->key(), "");
if (limit_ > 0 && ++count > limit_) {
break;
}
iter_->next();
}
return std::move(data_);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/exec/IndexVertexNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class IndexVertexNode final : public RelNode<T> {
IndexScanNode<T>* indexScanNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
const std::string& schemaName,
int64_t limit)
int64_t limit = -1)
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
Expand All @@ -42,11 +42,8 @@ class IndexVertexNode final : public RelNode<T> {
data_.clear();
std::vector<VertexID> vids;
auto* iter = static_cast<VertexIndexIterator*>(indexScanNode_->iterator());
int64_t count = 0;

while (iter && iter->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
Expand All @@ -61,6 +58,7 @@ class IndexVertexNode final : public RelNode<T> {
vids.emplace_back(iter->vId());
iter->next();
}
int64_t count = 1;
for (const auto& vId : vids) {
VLOG(1) << "partId " << partId << ", vId " << vId << ", tagId " << context_->tagId_;
auto key = NebulaKeyUtils::vertexKey(context_->vIdLen(), partId, vId, context_->tagId_);
Expand All @@ -73,6 +71,9 @@ class IndexVertexNode final : public RelNode<T> {
} else {
return ret;
}
if (limit_ > 0 && ++count > limit_) {
break;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand Down
28 changes: 15 additions & 13 deletions src/storage/index/LookupBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ template <typename REQ, typename RESP>
StatusOr<StoragePlan<IndexID>> LookupBaseProcessor<REQ, RESP>::buildPlan(
IndexFilterItem* filterItem, nebula::DataSet* result) {
StoragePlan<IndexID> plan;
// TODO(sky) : Limit is not supported yet for de-dup node.
// Related to paging scan, the de-dup execution plan needs to be refactored
auto deDup = std::make_unique<DeDupNode<IndexID>>(result, deDupColPos_);
int32_t filterId = 0;
std::unique_ptr<IndexOutputNode<IndexID>> out;
Expand Down Expand Up @@ -319,8 +321,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
if (context_->isEdge()) {
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_, limit_);
Expand Down Expand Up @@ -370,11 +372,11 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));

auto filter = std::make_unique<IndexFilterNode<IndexID>>(
context_.get(), indexScan.get(), exprCtx, exp, context_->isEdge());
context_.get(), indexScan.get(), exprCtx, exp, context_->isEdge(), limit_);
filter->addDependency(indexScan.get());
auto output =
std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get(), true);
Expand Down Expand Up @@ -421,14 +423,14 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
if (context_->isEdge()) {
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_, limit_);
context_.get(), indexScan.get(), schemas_, context_->edgeName_);
edge->addDependency(indexScan.get());
auto filter =
std::make_unique<IndexFilterNode<IndexID>>(context_.get(), edge.get(), exprCtx, exp);
auto filter = std::make_unique<IndexFilterNode<IndexID>>(
context_.get(), edge.get(), exprCtx, exp, limit_);
filter->addDependency(edge.get());

auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get());
Expand All @@ -439,10 +441,10 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
return output;
} else {
auto vertex = std::make_unique<IndexVertexNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->tagName_, limit_);
context_.get(), indexScan.get(), schemas_, context_->tagName_);
vertex->addDependency(indexScan.get());
auto filter =
std::make_unique<IndexFilterNode<IndexID>>(context_.get(), vertex.get(), exprCtx, exp);
auto filter = std::make_unique<IndexFilterNode<IndexID>>(
context_.get(), vertex.get(), exprCtx, exp, limit_);
filter->addDependency(vertex.get());

auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), filter.get());
Expand Down
5 changes: 5 additions & 0 deletions src/storage/index/LookupProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ void LookupProcessor::process(const cpp2::LookupIndexRequest& req) {

void LookupProcessor::doProcess(const cpp2::LookupIndexRequest& req) {
auto retCode = requestCheck(req);
if (limit_ == 0) {
onProcessFinished();
onFinished();
return;
}
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
for (auto& p : req.get_parts()) {
pushResultCode(retCode, p);
Expand Down
Loading

0 comments on commit 1b1848e

Please sign in to comment.