From 74d77fde0e219fbfd91da96b2a2b6a6da42cd1bf Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 12 Jan 2023 14:10:54 +0800 Subject: [PATCH] fix es delete error 1. remove get Rowreader if op is delete 2. delete es data when value is null --- .../listener/elasticsearch/ESListener.cpp | 94 +++++++++---------- 1 file changed, 43 insertions(+), 51 deletions(-) diff --git a/src/kvstore/listener/elasticsearch/ESListener.cpp b/src/kvstore/listener/elasticsearch/ESListener.cpp index bce1522e0c8..92435e6ec22 100644 --- a/src/kvstore/listener/elasticsearch/ESListener.cpp +++ b/src/kvstore/listener/elasticsearch/ESListener.cpp @@ -73,6 +73,18 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, const std::string& key, const std::string& value, const PickFunc& callback) { + bool isTag = nebula::NebulaKeyUtils::isTag(vIdLen_, key); + bool isEdge = nebula::NebulaKeyUtils::isEdge(vIdLen_, key); + if (!(isTag || isEdge)) { + return; + } + std::unordered_map ftIndexes; + nebula::RowReaderWrapper reader; + + std::string vid; + std::string src; + std::string dst; + int rank = 0; if (nebula::NebulaKeyUtils::isTag(vIdLen_, key)) { auto tagId = NebulaKeyUtils::getTagId(vIdLen_, key); auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, tagId); @@ -80,11 +92,7 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, LOG(ERROR) << ftIndexRes.status().message(); return; } - auto ftIndex = std::move(ftIndexRes).value(); - if (ftIndex.empty()) { - return; - } - nebula::RowReaderWrapper reader; + ftIndexes = std::move(ftIndexRes).value(); if (type == BatchLogType::OP_BATCH_PUT) { reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value); if (reader == nullptr) { @@ -92,28 +100,9 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, return; } } - for (auto& index : ftIndex) { - if (index.second.get_fields().size() > 1) { - LOG(ERROR) << "Only one field will create fulltext index"; - } - std::string text; - if (type == BatchLogType::OP_BATCH_PUT) { - auto field = index.second.get_fields().front(); - auto v = reader->getValueByName(field); - if (v.type() == Value::Type::NULLVALUE) { - continue; - } - if (v.type() != Value::Type::STRING) { - LOG(ERROR) << "Can't create fulltext index on type " << v.type(); - } - text = std::move(v).getStr(); - } - std::string indexName = index.first; - std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString(); - vid = truncateVid(vid); - callback(type, indexName, vid, "", "", 0, text); - } - } else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) { + vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString(); + vid = truncateVid(vid); + } else { auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key); if (edgeType < 0) { return; @@ -122,8 +111,7 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, if (!ftIndexRes.ok()) { return; } - auto ftIndex = std::move(ftIndexRes).value(); - nebula::RowReaderWrapper reader; + ftIndexes = std::move(ftIndexRes).value(); if (type == BatchLogType::OP_BATCH_PUT) { reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value); if (reader == nullptr) { @@ -131,32 +119,36 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, return; } } + src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString(); + dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString(); + rank = NebulaKeyUtils::getRank(vIdLen_, key); - for (auto& index : ftIndex) { - if (index.second.get_fields().size() > 1) { - LOG(ERROR) << "Only one field will create fulltext index"; + src = truncateVid(src); + dst = truncateVid(dst); + } + if (ftIndexes.empty()) { + return; + } + + for (auto& index : ftIndexes) { + if (index.second.get_fields().size() > 1) { + LOG(ERROR) << "Only one field will create fulltext index"; + } + std::string text; + std::string indexName = index.first; + if (type == BatchLogType::OP_BATCH_PUT) { + auto field = index.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() == Value::Type::NULLVALUE) { + callback(BatchLogType::OP_BATCH_REMOVE, indexName, vid, src, dst, 0, text); + continue; } - std::string text; - if (type == BatchLogType::OP_BATCH_PUT) { - auto field = index.second.get_fields().front(); - auto v = reader->getValueByName(field); - if (v.type() == Value::Type::NULLVALUE) { - continue; - } - if (v.type() != Value::Type::STRING) { - LOG(ERROR) << "Can't create fulltext index on type " << v.type(); - } - text = std::move(v).getStr(); + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); } - std::string indexName = index.first; - std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString(); - std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString(); - int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key); - - src = truncateVid(src); - dst = truncateVid(dst); - callback(type, indexName, "", src, dst, rank, text); + text = std::move(v).getStr(); } + callback(type, indexName, vid, src, dst, rank, text); } }