diff --git a/src/kvstore/listener/elasticsearch/ESListener.cpp b/src/kvstore/listener/elasticsearch/ESListener.cpp index 66927c8c5da..bce1522e0c8 100644 --- a/src/kvstore/listener/elasticsearch/ESListener.cpp +++ b/src/kvstore/listener/elasticsearch/ESListener.cpp @@ -84,27 +84,33 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, if (ftIndex.empty()) { return; } - auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value); - if (reader == nullptr) { - LOG(ERROR) << "get tag reader failed, tagID " << tagId; - return; + nebula::RowReaderWrapper reader; + if (type == BatchLogType::OP_BATCH_PUT) { + reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value); + if (reader == nullptr) { + LOG(ERROR) << "get tag reader failed, tagID " << tagId; + return; + } } for (auto& index : ftIndex) { if (index.second.get_fields().size() > 1) { LOG(ERROR) << "Only one field will create fulltext index"; } - auto field = index.second.get_fields().front(); - auto v = reader->getValueByName(field); - if (v.type() == Value::Type::NULLVALUE) { - continue; - } - if (v.type() != Value::Type::STRING) { - LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + std::string text; + if (type == BatchLogType::OP_BATCH_PUT) { + auto field = index.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() == Value::Type::NULLVALUE) { + continue; + } + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + } + text = std::move(v).getStr(); } std::string indexName = index.first; std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString(); vid = truncateVid(vid); - std::string text = std::move(v).getStr(); callback(type, indexName, vid, "", "", 0, text); } } else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) { @@ -117,28 +123,36 @@ void ESListener::pickTagAndEdgeData(BatchLogType type, return; } auto ftIndex = std::move(ftIndexRes).value(); - auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value); - if (reader == nullptr) { - LOG(ERROR) << "get edge reader failed, schema ID " << edgeType; - return; + nebula::RowReaderWrapper reader; + if (type == BatchLogType::OP_BATCH_PUT) { + reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value); + if (reader == nullptr) { + LOG(ERROR) << "get edge reader failed, schema ID " << edgeType; + return; + } } + for (auto& index : ftIndex) { if (index.second.get_fields().size() > 1) { LOG(ERROR) << "Only one field will create fulltext index"; } - auto field = index.second.get_fields().front(); - auto v = reader->getValueByName(field); - if (v.type() == Value::Type::NULLVALUE) { - continue; - } - if (v.type() != Value::Type::STRING) { - LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + std::string text; + if (type == BatchLogType::OP_BATCH_PUT) { + auto field = index.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() == Value::Type::NULLVALUE) { + continue; + } + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + } + text = std::move(v).getStr(); } std::string indexName = index.first; std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString(); std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString(); int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key); - std::string text = std::move(v).getStr(); + src = truncateVid(src); dst = truncateVid(dst); callback(type, indexName, "", src, dst, rank, text);