From 6aed18fdd311a9c287b2124205f771480e379d4e Mon Sep 17 00:00:00 2001 From: cpw <13495049+CPWstatic@users.noreply.github.com> Date: Wed, 7 Apr 2021 16:50:19 +0800 Subject: [PATCH] Fix get all tags (#2461) * Fix get all tags. * Fix empty props. Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com> --- src/graph/FetchVerticesExecutor.cpp | 145 +++++++++++++++++++--------- src/graph/FetchVerticesExecutor.h | 12 +++ src/graph/TraverseExecutor.cpp | 30 ++++-- src/parser/TraverseSentences.h | 5 + 4 files changed, 141 insertions(+), 51 deletions(-) diff --git a/src/graph/FetchVerticesExecutor.cpp b/src/graph/FetchVerticesExecutor.cpp index d0920ac68f3..50e8a386263 100644 --- a/src/graph/FetchVerticesExecutor.cpp +++ b/src/graph/FetchVerticesExecutor.cpp @@ -108,7 +108,7 @@ Status FetchVerticesExecutor::prepareTags() { return Status::Error("tags shall never be empty"); } - if (tagNames.size() == 1 && *tagNames[0] == "*") { + if (sentence_->isAllTags()) { auto tagsStatus = ectx()->schemaManager()->getAllTag(spaceId_); if (!tagsStatus.ok()) { return tagsStatus.status(); @@ -363,48 +363,10 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) { std::unordered_map> tagSchemaMap; std::set tagIdSet; - for (auto &resp : all) { - if (!resp.__isset.vertices || resp.vertices.empty()) { - continue; - } - auto *vertexSchema = resp.get_vertex_schema(); - if (vertexSchema != nullptr) { - std::transform(vertexSchema->cbegin(), vertexSchema->cend(), - std::inserter(tagSchemaMap, tagSchemaMap.begin()), [](auto &s) { - return std::make_pair( - s.first, std::make_shared(s.second)); - }); - } - - for (auto &vdata : resp.vertices) { - if (!vdata.__isset.tag_data || vdata.tag_data.empty()) { - continue; - } - for (auto& tagData : vdata.tag_data) { - auto& data = tagData.data; - VertexID vid = vdata.vertex_id; - TagID tagId = tagData.tag_id; - if (tagSchemaMap.find(tagId) == tagSchemaMap.end()) { - auto ver = RowReader::getSchemaVer(data); - if (ver < 0) { - LOG(ERROR) << "Found schema version negative " << ver; - doError(Status::Error("Found schema version negative: %d", ver)); - return; - } - auto schema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId, ver); - if (schema == nullptr) { - VLOG(3) << "Schema not found for tag id: " << tagId; - // Ignore the bad data. - continue; - } - tagSchemaMap[tagId] = schema; - } - auto vschema = tagSchemaMap[tagId]; - auto vreader = RowReader::getRowReader(data, vschema); - dataMap[vid].emplace(std::make_pair(tagId, std::move(vreader))); - tagIdSet.insert(tagId); - } - } + if (sentence_->isAllTags() && yieldClause_ == nullptr) { + processAll(all, dataMap, tagSchemaMap, tagIdSet); + } else { + process(all, dataMap, tagSchemaMap, tagIdSet); } if (yieldClause_ == nullptr) { @@ -473,7 +435,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) { auto tagIter = ds.find(tagId); if (tagIter != ds.end()) { auto vreader = tagIter->second.get(); - auto vschema = vreader->getSchema().get(); + auto vschema = tagSchemaMap[tagId].get(); return Collector::getProp(vschema, prop, vreader); } else { auto ts = ectx()->schemaManager()->getTagSchema(spaceId_, tagId); @@ -543,7 +505,7 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) { auto tagIter = ds.find(tagId); if (tagIter != ds.end()) { auto vreader = tagIter->second.get(); - auto vschema = vreader->getSchema().get(); + auto vschema = tagSchemaMap[tagId].get(); return Collector::getProp(vschema, prop, vreader); } else { auto ts = ectx()->schemaManager()->getTagSchema(spaceId_, tagId); @@ -588,6 +550,99 @@ void FetchVerticesExecutor::processResult(RpcResponse &&result) { finishExecution(std::move(rsWriter)); } +void FetchVerticesExecutor::process( + const std::vector &all, + std::unordered_map> &dataMap, + std::unordered_map> &tagSchemaMap, + std::set &tagIdSet) { + for (auto &resp : all) { + if (!resp.__isset.vertices || resp.vertices.empty()) { + continue; + } + auto *vertexSchema = resp.get_vertex_schema(); + if (vertexSchema != nullptr) { + std::transform(vertexSchema->cbegin(), vertexSchema->cend(), + std::inserter(tagSchemaMap, tagSchemaMap.begin()), [](auto &s) { + return std::make_pair( + s.first, std::make_shared(s.second)); + }); + } + + for (auto &vdata : resp.vertices) { + if (!vdata.__isset.tag_data || vdata.tag_data.empty()) { + continue; + } + for (auto& tagData : vdata.tag_data) { + auto& data = tagData.data; + VertexID vid = vdata.vertex_id; + TagID tagId = tagData.tag_id; + auto vschema = tagSchemaMap[tagId]; + auto& readers = dataMap[vid]; + if (vschema == nullptr) { + auto latestSchema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId); + if (latestSchema == nullptr) { + VLOG(3) << "Schema not found for tag id: " << tagId; + // Ignore the bad data. + continue; + } + tagSchemaMap[tagId] = latestSchema; + vschema = latestSchema; + } + auto vreader = RowReader::getRowReader(data, vschema); + readers.emplace(std::make_pair(tagId, std::move(vreader))); + tagIdSet.insert(tagId); + } + } + } +} + +void FetchVerticesExecutor::processAll( + const std::vector &all, + std::unordered_map> &dataMap, + std::unordered_map> &tagSchemaMap, + std::set &tagIdSet) { + for (auto &resp : all) { + if (!resp.__isset.vertices || resp.vertices.empty()) { + continue; + } + + for (auto &vdata : resp.vertices) { + if (!vdata.__isset.tag_data || vdata.tag_data.empty()) { + continue; + } + for (auto& tagData : vdata.tag_data) { + auto& data = tagData.data; + VertexID vid = vdata.vertex_id; + TagID tagId = tagData.tag_id; + auto ver = RowReader::getSchemaVer(data); + if (ver < 0) { + LOG(ERROR) << "Found schema version negative " << ver; + doError(Status::Error("Found schema version negative: %d", ver)); + return; + } + auto schema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId, ver); + if (schema == nullptr) { + VLOG(3) << "Schema not found for tag id: " << tagId; + // Ignore the bad data. + continue; + } + if (tagSchemaMap.find(tagId) == tagSchemaMap.end()) { + auto latestSchema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId); + if (latestSchema == nullptr) { + VLOG(3) << "Schema not found for tag id: " << tagId; + // Ignore the bad data. + continue; + } + tagSchemaMap[tagId] = latestSchema; + } + auto vreader = RowReader::getRowReader(data, schema); + dataMap[vid].emplace(std::make_pair(tagId, std::move(vreader))); + tagIdSet.insert(tagId); + } + } + } +} + void FetchVerticesExecutor::setupResponse(cpp2::ExecutionResponse &resp) { if (resp_ == nullptr) { resp_ = std::make_unique(); diff --git a/src/graph/FetchVerticesExecutor.h b/src/graph/FetchVerticesExecutor.h index c683496b118..155be6148bc 100644 --- a/src/graph/FetchVerticesExecutor.h +++ b/src/graph/FetchVerticesExecutor.h @@ -52,6 +52,18 @@ class FetchVerticesExecutor final : public TraverseExecutor { kRef, }; + void process( + const std::vector &all, + std::unordered_map> &dataMap, + std::unordered_map> &tagSchemaMap, + std::set &tagIdSet); + + void processAll( + const std::vector &all, + std::unordered_map> &dataMap, + std::unordered_map> &tagSchemaMap, + std::set &tagIdSet); + private: GraphSpaceID spaceId_{-1}; FromType fromType_{kInstantExpr}; diff --git a/src/graph/TraverseExecutor.cpp b/src/graph/TraverseExecutor.cpp index 75f5f92c688..f5dc8d61660 100644 --- a/src/graph/TraverseExecutor.cpp +++ b/src/graph/TraverseExecutor.cpp @@ -248,38 +248,56 @@ OptVariantType Collector::getProp(const meta::SchemaProviderIf *schema, switch (type) { case SupportedType::BOOL: { bool v; - reader->getBool(prop, v); + auto ret = reader->getBool(prop, v); + if (ret != ResultType::SUCCEEDED) { + return RowReader::getDefaultProp(type); + } VLOG(3) << "get prop: " << prop << ", value: " << v; return v; } case SupportedType::TIMESTAMP: case SupportedType::INT: { int64_t v; - reader->getInt(prop, v); + auto ret = reader->getInt(prop, v); + if (ret != ResultType::SUCCEEDED) { + return RowReader::getDefaultProp(type); + } VLOG(3) << "get prop: " << prop << ", value: " << v; return v; } case SupportedType::VID: { VertexID v; - reader->getVid(prop, v); + auto ret = reader->getVid(prop, v); + if (ret != ResultType::SUCCEEDED) { + return RowReader::getDefaultProp(type); + } VLOG(3) << "get prop: " << prop << ", value: " << v; return v; } case SupportedType::FLOAT: { float v; - reader->getFloat(prop, v); + auto ret = reader->getFloat(prop, v); + if (ret != ResultType::SUCCEEDED) { + return RowReader::getDefaultProp(type); + } VLOG(3) << "get prop: " << prop << ", value: " << v; return static_cast(v); } case SupportedType::DOUBLE: { double v; - reader->getDouble(prop, v); + auto ret = reader->getDouble(prop, v); + if (ret != ResultType::SUCCEEDED) { + return RowReader::getDefaultProp(type); + } VLOG(3) << "get prop: " << prop << ", value: " << v; return v; } case SupportedType::STRING: { folly::StringPiece v; - reader->getString(prop, v); + auto ret = reader->getString(prop, v); + if (ret != ResultType::SUCCEEDED) { + return RowReader::getDefaultProp(type); + } VLOG(3) << "get prop: " << prop << ", value: " << v; return v.toString(); } diff --git a/src/parser/TraverseSentences.h b/src/parser/TraverseSentences.h index c9bd728b0f8..680469d08c5 100644 --- a/src/parser/TraverseSentences.h +++ b/src/parser/TraverseSentences.h @@ -331,6 +331,11 @@ class FetchVerticesSentence final : public Sentence { std::string toString() const override; + bool isAllTags() const { + const auto& tagNames = tags_->labels(); + return tagNames.size() == 1 && *tagNames[0] == "*"; + } + private: std::unique_ptr tags_; std::unique_ptr vidList_;