Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix delete fulltext index #5239

Merged
merged 3 commits into from
Jan 12, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 49 additions & 43 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,41 +73,36 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
const std::string& key,
const std::string& value,
const PickFunc& callback) {
bool isTag = nebula::NebulaKeyUtils::isTag(vIdLen_, key);
bool isEdge = nebula::NebulaKeyUtils::isEdge(vIdLen_, key);
if (!(isTag || isEdge)) {
return;
}
std::unordered_map<std::string, nebula::meta::cpp2::FTIndex> ftIndexes;
nebula::RowReaderWrapper reader;

std::string vid;
std::string src;
std::string dst;
int rank = 0;
if (nebula::NebulaKeyUtils::isTag(vIdLen_, key)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse isTag?

auto tagId = NebulaKeyUtils::getTagId(vIdLen_, key);
auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, tagId);
if (!ftIndexRes.ok()) {
LOG(ERROR) << ftIndexRes.status().message();
return;
}
auto ftIndex = std::move(ftIndexRes).value();
if (ftIndex.empty()) {
return;
}
auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value);
if (reader == nullptr) {
LOG(ERROR) << "get tag reader failed, tagID " << tagId;
return;
}
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
continue;
}
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
ftIndexes = std::move(ftIndexRes).value();
if (type == BatchLogType::OP_BATCH_PUT) {
reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value);
if (reader == nullptr) {
LOG(ERROR) << "get tag reader failed, tagID " << tagId;
return;
}
std::string indexName = index.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
vid = truncateVid(vid);
std::string text = std::move(v).getStr();
callback(type, indexName, vid, "", "", 0, text);
}
} else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) {
vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
vid = truncateVid(vid);
} else {
auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key);
if (edgeType < 0) {
return;
Expand All @@ -116,33 +111,44 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
if (!ftIndexRes.ok()) {
return;
}
auto ftIndex = std::move(ftIndexRes).value();
auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value);
if (reader == nullptr) {
LOG(ERROR) << "get edge reader failed, schema ID " << edgeType;
return;
}
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
ftIndexes = std::move(ftIndexRes).value();
if (type == BatchLogType::OP_BATCH_PUT) {
reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value);
if (reader == nullptr) {
LOG(ERROR) << "get edge reader failed, schema ID " << edgeType;
return;
}
}
src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
rank = NebulaKeyUtils::getRank(vIdLen_, key);

src = truncateVid(src);
dst = truncateVid(dst);
}
if (ftIndexes.empty()) {
return;
}

for (auto& index : ftIndexes) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
std::string text;
std::string indexName = index.first;
if (type == BatchLogType::OP_BATCH_PUT) {
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
callback(BatchLogType::OP_BATCH_REMOVE, indexName, vid, src, dst, 0, text);
continue;
}
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);
std::string text = std::move(v).getStr();
src = truncateVid(src);
dst = truncateVid(dst);
callback(type, indexName, "", src, dst, rank, text);
text = std::move(v).getStr();
}
callback(type, indexName, vid, src, dst, rank, text);
}
}

Expand Down