diff --git a/src/kvstore/listener/elasticsearch/ESListener.cpp b/src/kvstore/listener/elasticsearch/ESListener.cpp index 66927c8c5da..92435e6ec22 100644 --- a/src/kvstore/listener/elasticsearch/ESListener.cpp +++ b/src/kvstore/listener/elasticsearch/ESListener.cpp @@ -73,6 +73,18 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, const std::string& key, const std::string& value, const PickFunc& callback) { + bool isTag = nebula::NebulaKeyUtils::isTag(vIdLen_, key); + bool isEdge = nebula::NebulaKeyUtils::isEdge(vIdLen_, key); + if (!(isTag || isEdge)) { + return; + } + std::unordered_map ftIndexes; + nebula::RowReaderWrapper reader; + + std::string vid; + std::string src; + std::string dst; + int rank = 0; if (nebula::NebulaKeyUtils::isTag(vIdLen_, key)) { auto tagId = NebulaKeyUtils::getTagId(vIdLen_, key); auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, tagId); @@ -80,34 +92,17 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, LOG(ERROR) << ftIndexRes.status().message(); return; } - auto ftIndex = std::move(ftIndexRes).value(); - if (ftIndex.empty()) { - return; - } - auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value); - if (reader == nullptr) { - LOG(ERROR) << "get tag reader failed, tagID " << tagId; - return; - } - for (auto& index : ftIndex) { - if (index.second.get_fields().size() > 1) { - LOG(ERROR) << "Only one field will create fulltext index"; - } - auto field = index.second.get_fields().front(); - auto v = reader->getValueByName(field); - if (v.type() == Value::Type::NULLVALUE) { - continue; - } - if (v.type() != Value::Type::STRING) { - LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + ftIndexes = std::move(ftIndexRes).value(); + if (type == BatchLogType::OP_BATCH_PUT) { + reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value); + if (reader == nullptr) { + LOG(ERROR) << "get tag reader failed, tagID " << tagId; + return; } - std::string indexName = index.first; - std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString(); - vid = truncateVid(vid); - std::string text = std::move(v).getStr(); - callback(type, indexName, vid, "", "", 0, text); } - } else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) { + vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString(); + vid = truncateVid(vid); + } else { auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key); if (edgeType < 0) { return; @@ -116,33 +111,44 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, if (!ftIndexRes.ok()) { return; } - auto ftIndex = std::move(ftIndexRes).value(); - auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value); - if (reader == nullptr) { - LOG(ERROR) << "get edge reader failed, schema ID " << edgeType; - return; - } - for (auto& index : ftIndex) { - if (index.second.get_fields().size() > 1) { - LOG(ERROR) << "Only one field will create fulltext index"; + ftIndexes = std::move(ftIndexRes).value(); + if (type == BatchLogType::OP_BATCH_PUT) { + reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value); + if (reader == nullptr) { + LOG(ERROR) << "get edge reader failed, schema ID " << edgeType; + return; } + } + src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString(); + dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString(); + rank = NebulaKeyUtils::getRank(vIdLen_, key); + + src = truncateVid(src); + dst = truncateVid(dst); + } + if (ftIndexes.empty()) { + return; + } + + for (auto& index : ftIndexes) { + if (index.second.get_fields().size() > 1) { + LOG(ERROR) << "Only one field will create fulltext index"; + } + std::string text; + std::string indexName = index.first; + if (type == BatchLogType::OP_BATCH_PUT) { auto field = index.second.get_fields().front(); auto v = reader->getValueByName(field); if (v.type() == Value::Type::NULLVALUE) { + callback(BatchLogType::OP_BATCH_REMOVE, indexName, vid, src, dst, 0, text); continue; } if (v.type() != Value::Type::STRING) { LOG(ERROR) << "Can't create fulltext index on type " << v.type(); } - std::string indexName = index.first; - std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString(); - std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString(); - int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key); - std::string text = std::move(v).getStr(); - src = truncateVid(src); - dst = truncateVid(dst); - callback(type, indexName, "", src, dst, rank, text); + text = std::move(v).getStr(); } + callback(type, indexName, vid, src, dst, rank, text); } }