Skip to content

Commit

Permalink
fix delete fulltext index (#5239)
Browse files Browse the repository at this point in the history
* fix delete fulltext index

* fix es delete error
1. remove get Rowreader if op is delete
2. delete es data when value is null

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
cangfengzhs and Sophie-Xie committed Jan 28, 2023
1 parent b391aad commit 19cc9b7
Showing 1 changed file with 49 additions and 43 deletions.
92 changes: 49 additions & 43 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,41 +73,36 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
const std::string& key,
const std::string& value,
const PickFunc& callback) {
bool isTag = nebula::NebulaKeyUtils::isTag(vIdLen_, key);
bool isEdge = nebula::NebulaKeyUtils::isEdge(vIdLen_, key);
if (!(isTag || isEdge)) {
return;
}
std::unordered_map<std::string, nebula::meta::cpp2::FTIndex> ftIndexes;
nebula::RowReaderWrapper reader;

std::string vid;
std::string src;
std::string dst;
int rank = 0;
if (nebula::NebulaKeyUtils::isTag(vIdLen_, key)) {
auto tagId = NebulaKeyUtils::getTagId(vIdLen_, key);
auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, tagId);
if (!ftIndexRes.ok()) {
LOG(ERROR) << ftIndexRes.status().message();
return;
}
auto ftIndex = std::move(ftIndexRes).value();
if (ftIndex.empty()) {
return;
}
auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value);
if (reader == nullptr) {
LOG(ERROR) << "get tag reader failed, tagID " << tagId;
return;
}
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
continue;
}
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
ftIndexes = std::move(ftIndexRes).value();
if (type == BatchLogType::OP_BATCH_PUT) {
reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value);
if (reader == nullptr) {
LOG(ERROR) << "get tag reader failed, tagID " << tagId;
return;
}
std::string indexName = index.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
vid = truncateVid(vid);
std::string text = std::move(v).getStr();
callback(type, indexName, vid, "", "", 0, text);
}
} else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) {
vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
vid = truncateVid(vid);
} else {
auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key);
if (edgeType < 0) {
return;
Expand All @@ -116,33 +111,44 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
if (!ftIndexRes.ok()) {
return;
}
auto ftIndex = std::move(ftIndexRes).value();
auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value);
if (reader == nullptr) {
LOG(ERROR) << "get edge reader failed, schema ID " << edgeType;
return;
}
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
ftIndexes = std::move(ftIndexRes).value();
if (type == BatchLogType::OP_BATCH_PUT) {
reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value);
if (reader == nullptr) {
LOG(ERROR) << "get edge reader failed, schema ID " << edgeType;
return;
}
}
src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
rank = NebulaKeyUtils::getRank(vIdLen_, key);

src = truncateVid(src);
dst = truncateVid(dst);
}
if (ftIndexes.empty()) {
return;
}

for (auto& index : ftIndexes) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
std::string text;
std::string indexName = index.first;
if (type == BatchLogType::OP_BATCH_PUT) {
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
callback(BatchLogType::OP_BATCH_REMOVE, indexName, vid, src, dst, 0, text);
continue;
}
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);
std::string text = std::move(v).getStr();
src = truncateVid(src);
dst = truncateVid(dst);
callback(type, indexName, "", src, dst, rank, text);
text = std::move(v).getStr();
}
callback(type, indexName, vid, src, dst, rank, text);
}
}

Expand Down

0 comments on commit 19cc9b7

Please sign in to comment.