Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Support specified columns for full-text index (#460)
Browse files Browse the repository at this point in the history
* draft

* addressed panda's comments
  • Loading branch information
bright-starry-sky authored May 26, 2021
1 parent 57304fc commit cd85c2d
Show file tree
Hide file tree
Showing 23 changed files with 1,038 additions and 35 deletions.
37 changes: 14 additions & 23 deletions src/kvstore/plugins/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ bool ESListener::appendDocItem(std::vector<DocItem>& items, const KV& kv) const

bool ESListener::appendEdgeDocItem(std::vector<DocItem>& items, const KV& kv) const {
auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, kv.first);
auto schema = schemaMan_->getEdgeSchema(spaceId_, edgeType);
if (schema == nullptr) {
VLOG(3) << "get edge schema failed, edgeType " << edgeType;
return false;
auto ftIndex = schemaMan_->getFTIndex(spaceId_, edgeType);
if (!ftIndex.ok()) {
VLOG(3) << "get text search index failed";
return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false;
}
auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_,
spaceId_,
Expand All @@ -167,15 +167,15 @@ bool ESListener::appendEdgeDocItem(std::vector<DocItem>& items, const KV& kv) co
VLOG(3) << "get edge reader failed, schema ID " << edgeType;
return false;
}
return appendDocs(items, schema.get(), reader.get(), edgeType, true);
return appendDocs(items, reader.get(), std::move(ftIndex).value());
}

bool ESListener::appendTagDocItem(std::vector<DocItem>& items, const KV& kv) const {
auto tagId = NebulaKeyUtils::getTagId(vIdLen_, kv.first);
auto schema = schemaMan_->getTagSchema(spaceId_, tagId);
if (schema == nullptr) {
VLOG(3) << "get tag schema failed, tagId " << tagId;
return false;
auto ftIndex = schemaMan_->getFTIndex(spaceId_, tagId);
if (!ftIndex.ok()) {
VLOG(3) << "get text search index failed";
return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false;
}
auto reader = RowReaderWrapper::getTagPropReader(schemaMan_,
spaceId_,
Expand All @@ -185,27 +185,18 @@ bool ESListener::appendTagDocItem(std::vector<DocItem>& items, const KV& kv) con
VLOG(3) << "get tag reader failed, tagID " << tagId;
return false;
}
return appendDocs(items, schema.get(), reader.get(), tagId, false);
return appendDocs(items, reader.get(), std::move(ftIndex).value());
}

bool ESListener::appendDocs(std::vector<DocItem>& items,
const meta::SchemaProviderIf* schema,
RowReader* reader,
int32_t schemaId,
bool isEdge) const {
auto count = schema->getNumFields();
for (size_t i = 0; i < count; i++) {
auto name = schema->getFieldName(i);
auto v = reader->getValueByName(name);
const std::pair<std::string, nebula::meta::cpp2::FTIndex>& fti) const {
for (const auto& field : fti.second.get_fields()) {
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
continue;
}
auto ftIndex = nebula::plugin::IndexTraits::indexName(*spaceName_, isEdge);
items.emplace_back(DocItem(std::move(ftIndex),
std::move(name),
partId_,
schemaId,
std::move(v).getStr()));
items.emplace_back(DocItem(fti.first, field, partId_, std::move(v).getStr()));
}
return true;
}
Expand Down
7 changes: 2 additions & 5 deletions src/kvstore/plugins/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,8 @@ class ESListener : public Listener {

bool appendTagDocItem(std::vector<DocItem>& items, const KV& kv) const;

bool appendDocs(std::vector<DocItem>& items,
const meta::SchemaProviderIf* schema,
RowReader* reader,
int32_t schemaId,
bool isEdge) const;
bool appendDocs(std::vector<DocItem>& items, RowReader* reader,
const std::pair<std::string, nebula::meta::cpp2::FTIndex>& fti) const;

bool writeData(const std::vector<nebula::plugin::DocItem>& items) const;

Expand Down
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ nebula_add_library(
processors/indexMan/GetEdgeIndexProcessor.cpp
processors/indexMan/ListEdgeIndexesProcessor.cpp
processors/indexMan/FTServiceProcessor.cpp
processors/indexMan/FTIndexProcessor.cpp
processors/customKV/GetProcessor.cpp
processors/customKV/MultiGetProcessor.cpp
processors/customKV/MultiPutProcessor.cpp
Expand Down
19 changes: 19 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "meta/processors/indexMan/GetEdgeIndexProcessor.h"
#include "meta/processors/indexMan/ListEdgeIndexesProcessor.h"
#include "meta/processors/indexMan/FTServiceProcessor.h"
#include "meta/processors/indexMan/FTIndexProcessor.h"
#include "meta/processors/customKV/MultiPutProcessor.h"
#include "meta/processors/customKV/GetProcessor.h"
#include "meta/processors/customKV/MultiGetProcessor.h"
Expand Down Expand Up @@ -305,6 +306,24 @@ MetaServiceHandler::future_listFTClients(const cpp2::ListFTClientsReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createFTIndex(const cpp2::CreateFTIndexReq& req) {
auto* processor = CreateFTIndexProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_dropFTIndex(const cpp2::DropFTIndexReq& req) {
auto* processor = DropFTIndexProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ListFTIndexesResp>
MetaServiceHandler::future_listFTIndexes(const cpp2::ListFTIndexesReq& req) {
auto* processor = ListFTIndexesProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::HBResp>
MetaServiceHandler::future_heartBeat(const cpp2::HBReq& req) {
auto* processor = HBProcessor::instance(kvstore_, &kHBCounters, clusterId_);
Expand Down
9 changes: 9 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListFTClientsResp>
future_listFTClients(const cpp2::ListFTClientsReq& req) override;

folly::Future<cpp2::ExecResp>
future_createFTIndex(const cpp2::CreateFTIndexReq& req) override;

folly::Future<cpp2::ExecResp>
future_dropFTIndex(const cpp2::DropFTIndexReq& req) override;

folly::Future<cpp2::ListFTIndexesResp>
future_listFTIndexes(const cpp2::ListFTIndexesReq& req) override;

/**
* User manager
**/
Expand Down
29 changes: 29 additions & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1397,5 +1397,34 @@ meta::cpp2::Session MetaServiceUtils::parseSessionVal(const folly::StringPiece &
apache::thrift::CompactSerializer::deserialize(val, session);
return session;
}

std::string MetaServiceUtils::fulltextIndexKey(const std::string& indexName) {
std::string key;
key.reserve(kFTIndexTable.size() + indexName.size());
key.append(kFTIndexTable.data(), kFTIndexTable.size())
.append(indexName);
return key;
}

std::string MetaServiceUtils::fulltextIndexVal(const cpp2::FTIndex& index) {
std::string val;
apache::thrift::CompactSerializer::serialize(index, &val);
return val;
}

std::string MetaServiceUtils::parsefulltextIndexName(folly::StringPiece key) {
return key.subpiece(kFTIndexTable.size(), key.size()).toString();
}

cpp2::FTIndex MetaServiceUtils::parsefulltextIndex(folly::StringPiece val) {
cpp2::FTIndex ftIndex;
apache::thrift::CompactSerializer::deserialize(val, ftIndex);
return ftIndex;
}

std::string MetaServiceUtils::fulltextIndexPrefix() {
return kFTIndexTable;
}

} // namespace meta
} // namespace nebula
10 changes: 10 additions & 0 deletions src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,16 @@ class MetaServiceUtils final {

static meta::cpp2::Session parseSessionVal(const folly::StringPiece &val);

static std::string fulltextIndexKey(const std::string& indexName);

static std::string fulltextIndexVal(const cpp2::FTIndex& index);

static std::string parsefulltextIndexName(folly::StringPiece key);

static cpp2::FTIndex parsefulltextIndex(folly::StringPiece val);

static std::string fulltextIndexPrefix();

static std::string genTimestampStr();

static GraphSpaceID parseEdgesKeySpaceID(folly::StringPiece key);
Expand Down
7 changes: 7 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,16 @@ class BaseProcessor {
indexCheck(const std::vector<cpp2::IndexItem>& items,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

nebula::cpp2::ErrorCode
ftIndexCheck(const std::vector<std::string>& cols,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::IndexItem>>
getIndexes(GraphSpaceID spaceId, int32_t tagOrEdge);

ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge);

bool checkIndexExist(const std::vector<cpp2::IndexFieldDef>& fields,
const cpp2::IndexItem& item);

Expand Down
47 changes: 46 additions & 1 deletion src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,31 @@ BaseProcessor<RESP>::getIndexes(GraphSpaceID spaceId, int32_t tagOrEdge) {
}
return items;
}
template<typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
BaseProcessor<RESP>::getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge) {
const auto& indexPrefix = MetaServiceUtils::fulltextIndexPrefix();
auto iterRet = doPrefix(indexPrefix);
if (!nebula::ok(iterRet)) {
auto retCode = nebula::error(iterRet);
LOG(ERROR) << "Tag or edge fulltext index prefix failed, error :"
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
auto indexIter = nebula::value(iterRet).get();

while (indexIter->valid()) {
auto index = MetaServiceUtils::parsefulltextIndex(indexIter->val());
auto id = index.get_depend_schema().getType() == cpp2::SchemaID::Type::edge_type
? index.get_depend_schema().get_edge_type()
: index.get_depend_schema().get_tag_id();
if (spaceId == index.get_space_id() && tagOrEdge == id) {
return index;
}
indexIter->next();
}
return nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
}

template<typename RESP>
nebula::cpp2::ErrorCode
Expand Down Expand Up @@ -471,7 +495,28 @@ BaseProcessor<RESP>::indexCheck(const std::vector<cpp2::IndexItem>& items,
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

template<typename RESP>
nebula::cpp2::ErrorCode
BaseProcessor<RESP>::ftIndexCheck(const std::vector<std::string>& cols,
const std::vector<cpp2::AlterSchemaItem>& alterItems) {
for (const auto& item : alterItems) {
if (*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::CHANGE ||
*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::DROP) {
const auto& itemCols = item.get_schema().get_columns();
for (const auto& iCol : itemCols) {
auto it = std::find_if(cols.begin(), cols.end(),
[&] (const auto& c) {
return c == iCol.name;
});
if (it != cols.end()) {
LOG(ERROR) << "fulltext index conflict";
return nebula::cpp2::ErrorCode::E_CONFLICT;
}
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

template<typename RESP>
bool BaseProcessor<RESP>::checkIndexExist(const std::vector<cpp2::IndexFieldDef>& fields,
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ GENERATE_LOCK(edge);
GENERATE_LOCK(tagIndex);
GENERATE_LOCK(edgeIndex);
GENERATE_LOCK(fulltextServices);
GENERATE_LOCK(fulltextIndex);
GENERATE_LOCK(user);
GENERATE_LOCK(config);
GENERATE_LOCK(snapshot);
Expand Down
20 changes: 19 additions & 1 deletion src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "common/base/CommonMacro.h"
#include "meta/processors/indexMan/CreateEdgeIndexProcessor.h"

namespace nebula {
Expand Down Expand Up @@ -127,13 +128,30 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
return;
}
cpp2::ColumnDef col = *iter;
if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (col.type.get_type() == meta::cpp2::PropertyType::FIXED_STRING) {
if (*col.type.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
} else if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (!field.type_length_ref().has_value()) {
LOG(ERROR) << "No type length set : " << field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}
if (*field.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
col.type.set_type(meta::cpp2::PropertyType::FIXED_STRING);
col.type.set_type_length(*field.get_type_length());
} else if (field.type_length_ref().has_value()) {
Expand Down
20 changes: 19 additions & 1 deletion src/meta/processors/indexMan/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "common/base/CommonMacro.h"
#include "meta/processors/indexMan/CreateTagIndexProcessor.h"

namespace nebula {
Expand Down Expand Up @@ -126,13 +127,30 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
return;
}
cpp2::ColumnDef col = *iter;
if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (col.type.get_type() == meta::cpp2::PropertyType::FIXED_STRING) {
if (*col.type.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
} else if (col.type.get_type() == meta::cpp2::PropertyType::STRING) {
if (!field.type_length_ref().has_value()) {
LOG(ERROR) << "No type length set : " << field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}
if (*field.get_type_length() > MAX_INDEX_TYPE_LENGTH) {
LOG(ERROR) << "Unsupport index type lengths greater than "
<< MAX_INDEX_TYPE_LENGTH << " : "
<< field.get_name();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNSUPPORTED);
onFinished();
return;
}
col.type.set_type(meta::cpp2::PropertyType::FIXED_STRING);
col.type.set_type_length(*field.get_type_length());
} else if (field.type_length_ref().has_value()) {
Expand Down
Loading

0 comments on commit cd85c2d

Please sign in to comment.