Skip to content

Commit

Permalink
Merge branch 'master' into enhancement/fit-GetNeighborsIter-size
Browse files Browse the repository at this point in the history
  • Loading branch information
Shylock-Hg authored May 9, 2022
2 parents a460832 + 1fd89af commit cca71c6
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 64 deletions.
4 changes: 4 additions & 0 deletions src/common/meta/NebulaSchemaProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ class NebulaSchemaProvider : public SchemaProviderIf {

StatusOr<std::pair<std::string, int64_t>> getTTLInfo() const;

bool hasNullableCol() const {
return numNullableFields_ != 0;
}

protected:
NebulaSchemaProvider() = default;

Expand Down
7 changes: 5 additions & 2 deletions src/graph/planner/ngql/SubgraphPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ StatusOr<SubPlan> SubgraphPlanner::nSteps(SubPlan& startVidPlan, const std::stri
subgraph->setOutputVar(input);
subgraph->setBiDirectEdgeTypes(subgraphCtx_->biDirectEdgeTypes);
subgraph->setColNames({nebula::kVid});

auto* condition = loopCondition(steps.steps() + 1, gn->outputVar());
uint32_t maxSteps = steps.steps();
if (subgraphCtx_->getEdgeProp || subgraphCtx_->withProp) {
++maxSteps;
}
auto* condition = loopCondition(maxSteps, gn->outputVar());
auto* loop = Loop::make(qctx, startVidPlan.root, subgraph, condition);

auto* dc = DataCollect::make(qctx, DataCollect::DCKind::kSubgraph);
Expand Down
5 changes: 3 additions & 2 deletions src/storage/exec/IndexEdgeScanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ IndexEdgeScanNode::IndexEdgeScanNode(const IndexEdgeScanNode& node)
IndexEdgeScanNode::IndexEdgeScanNode(RuntimeContext* context,
IndexID indexId,
const std::vector<cpp2::IndexColumnHint>& columnHint,
::nebula::kvstore::KVStore* kvstore)
: IndexScanNode(context, "IndexEdgeScanNode", indexId, columnHint, kvstore) {
::nebula::kvstore::KVStore* kvstore,
bool hasNullableCol)
: IndexScanNode(context, "IndexEdgeScanNode", indexId, columnHint, kvstore, hasNullableCol) {
getIndex = std::function([this](std::shared_ptr<IndexItem>& index) {
auto env = this->context_->env();
auto indexMgr = env->indexMan_;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/exec/IndexEdgeScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class IndexEdgeScanNode : public IndexScanNode {
IndexEdgeScanNode(RuntimeContext* context,
IndexID indexId,
const std::vector<cpp2::IndexColumnHint>& columnHint,
::nebula::kvstore::KVStore* kvstore);
::nebula::kvstore::KVStore* kvstore,
bool hasNullableCol);
::nebula::cpp2::ErrorCode init(InitContext& ctx) override;
std::unique_ptr<IndexNode> copy() override;

Expand Down
7 changes: 5 additions & 2 deletions src/storage/exec/IndexScanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ IndexScanNode::IndexScanNode(const IndexScanNode& node)
partId_(node.partId_),
indexId_(node.indexId_),
index_(node.index_),
indexNullable_(node.indexNullable_),
columnHints_(node.columnHints_),
kvstore_(node.kvstore_),
indexNullable_(node.indexNullable_),
requiredColumns_(node.requiredColumns_),
requiredAndHintColumns_(node.requiredAndHintColumns_),
ttlProps_(node.ttlProps_),
Expand Down Expand Up @@ -498,11 +498,14 @@ void IndexScanNode::decodePropFromIndex(folly::StringPiece key,
if (colPosMap.empty()) {
return;
}
// offset is the start posistion of index values
size_t offset = sizeof(PartitionID) + sizeof(IndexID);
std::bitset<16> nullableBit;
int8_t nullableColPosit = 15;
if (indexNullable_) {
auto bitOffset = key.size() - context_->vIdLen() - sizeof(uint16_t);
// key has been truncated ouside, it **ONLY** contains partId, indexId, indexValue and
// nullableBits. So the last two bytes is the nullableBits
auto bitOffset = key.size() - sizeof(uint16_t);
auto v = *reinterpret_cast<const uint16_t*>(key.data() + bitOffset);
nullableBit = v;
}
Expand Down
17 changes: 11 additions & 6 deletions src/storage/exec/IndexScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ class IndexScanNode : public IndexNode {
const std::string& name,
IndexID indexId,
const std::vector<cpp2::IndexColumnHint>& columnHints,
::nebula::kvstore::KVStore* kvstore)
: IndexNode(context, name), indexId_(indexId), columnHints_(columnHints), kvstore_(kvstore) {}
::nebula::kvstore::KVStore* kvstore,
bool hasNullableCol)
: IndexNode(context, name),
indexId_(indexId),
columnHints_(columnHints),
kvstore_(kvstore),
indexNullable_(hasNullableCol) {}
::nebula::cpp2::ErrorCode init(InitContext& ctx) override;
std::string identify() override;

Expand Down Expand Up @@ -139,10 +144,6 @@ class IndexScanNode : public IndexNode {
* @brief index definition
*/
std::shared_ptr<nebula::meta::cpp2::IndexItem> index_;
/**
* @brief if index contain nullable field or not
*/
bool indexNullable_ = false;
const std::vector<cpp2::IndexColumnHint>& columnHints_;
/**
* @see Path
Expand All @@ -153,6 +154,10 @@ class IndexScanNode : public IndexNode {
*/
std::unique_ptr<kvstore::KVIterator> iter_;
nebula::kvstore::KVStore* kvstore_;
/**
* @brief if index contain nullable field or not
*/
bool indexNullable_ = false;
/**
* @brief row format that `doNext` needs to return
*/
Expand Down
5 changes: 3 additions & 2 deletions src/storage/exec/IndexVertexScanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ IndexVertexScanNode::IndexVertexScanNode(const IndexVertexScanNode& node)
IndexVertexScanNode::IndexVertexScanNode(RuntimeContext* context,
IndexID indexId,
const std::vector<cpp2::IndexColumnHint>& columnHint,
::nebula::kvstore::KVStore* kvstore)
: IndexScanNode(context, "IndexVertexScanNode", indexId, columnHint, kvstore) {
::nebula::kvstore::KVStore* kvstore,
bool hasNullableCol)
: IndexScanNode(context, "IndexVertexScanNode", indexId, columnHint, kvstore, hasNullableCol) {
getIndex = std::function([this](std::shared_ptr<IndexItem>& index) {
auto env = this->context_->env();
auto indexMgr = env->indexMan_;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/exec/IndexVertexScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class IndexVertexScanNode final : public IndexScanNode {
IndexVertexScanNode(RuntimeContext* context,
IndexID indexId,
const std::vector<cpp2::IndexColumnHint>& columnHint,
::nebula::kvstore::KVStore* kvstore);
::nebula::kvstore::KVStore* kvstore,
bool hasNullableCol);
::nebula::cpp2::ErrorCode init(InitContext& ctx) override;
std::unique_ptr<IndexNode> copy() override;

Expand Down
42 changes: 35 additions & 7 deletions src/storage/index/LookupProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<IndexNode>> LookupProcessor::bu
const cpp2::LookupIndexRequest& req) {
std::vector<std::unique_ptr<IndexNode>> nodes;
for (auto& ctx : req.get_indices().get_contexts()) {
auto node = buildOneContext(ctx);
nodes.emplace_back(std::move(node));
auto scan = buildOneContext(ctx);
if (!ok(scan)) {
return error(scan);
}
nodes.emplace_back(std::move(value(scan)));
}
for (size_t i = 0; i < nodes.size(); i++) {
auto projection =
Expand Down Expand Up @@ -164,17 +167,42 @@ ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<IndexNode>> LookupProcessor::bu
return std::move(nodes[0]);
}

std::unique_ptr<IndexNode> LookupProcessor::buildOneContext(const cpp2::IndexQueryContext& ctx) {
ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<IndexNode>> LookupProcessor::buildOneContext(
const cpp2::IndexQueryContext& ctx) {
std::unique_ptr<IndexNode> node;
DLOG(INFO) << ctx.get_column_hints().size();
DLOG(INFO) << &ctx.get_column_hints();
DLOG(INFO) << ::apache::thrift::SimpleJSONSerializer::serialize<std::string>(ctx);
if (context_->isEdge()) {
node = std::make_unique<IndexEdgeScanNode>(
context_.get(), ctx.get_index_id(), ctx.get_column_hints(), context_->env()->kvstore_);
auto idx = env_->indexMan_->getEdgeIndex(context_->spaceId(), ctx.get_index_id());
if (!idx.ok()) {
return nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
}
auto cols = idx.value()->get_fields();
bool hasNullableCol =
std::any_of(cols.begin(), cols.end(), [](const meta::cpp2::ColumnDef& col) {
return col.nullable_ref().value_or(false);
});
node = std::make_unique<IndexEdgeScanNode>(context_.get(),
ctx.get_index_id(),
ctx.get_column_hints(),
context_->env()->kvstore_,
hasNullableCol);
} else {
node = std::make_unique<IndexVertexScanNode>(
context_.get(), ctx.get_index_id(), ctx.get_column_hints(), context_->env()->kvstore_);
auto idx = env_->indexMan_->getTagIndex(context_->spaceId(), ctx.get_index_id());
if (!idx.ok()) {
return nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
}
auto cols = idx.value()->get_fields();
bool hasNullableCol =
std::any_of(cols.begin(), cols.end(), [](const meta::cpp2::ColumnDef& col) {
return col.nullable_ref().value_or(false);
});
node = std::make_unique<IndexVertexScanNode>(context_.get(),
ctx.get_index_id(),
ctx.get_column_hints(),
context_->env()->kvstore_,
hasNullableCol);
}
if (ctx.filter_ref().is_set() && !ctx.get_filter().empty()) {
auto expr = Expression::decode(context_->objPool(), *ctx.filter_ref());
Expand Down
3 changes: 2 additions & 1 deletion src/storage/index/LookupProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class LookupProcessor : public BaseProcessor<cpp2::LookupIndexResp> {
::nebula::cpp2::ErrorCode prepare(const cpp2::LookupIndexRequest& req);
ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<IndexNode>> buildPlan(
const cpp2::LookupIndexRequest& req);
std::unique_ptr<IndexNode> buildOneContext(const cpp2::IndexQueryContext& ctx);
ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<IndexNode>> buildOneContext(
const cpp2::IndexQueryContext& ctx);
std::vector<std::unique_ptr<IndexNode>> reproducePlan(IndexNode* root, size_t count);
ErrorOr<nebula::cpp2::ErrorCode, std::vector<std::pair<std::string, cpp2::StatType>>>
handleStatProps(const std::vector<cpp2::StatProp>& statProps);
Expand Down
Loading

0 comments on commit cca71c6

Please sign in to comment.