From 40934f6da7ffdc5b402515aaeced25fbd73d79cf Mon Sep 17 00:00:00 2001 From: trippli Date: Sat, 11 Jul 2020 17:56:17 +0800 Subject: [PATCH] rewrite fetch prop on vertex: 1. support input/variable/multi-vertex-id/multi-tags. 2. keep order of input. --- .gitignore | 3 + src/graph/CloudAuthenticator.cpp | 3 +- src/graph/FetchVerticesExecutor.cpp | 813 ++++++++++++++++---------- src/graph/FetchVerticesExecutor.h | 47 +- src/graph/TraverseExecutor.cpp | 4 +- src/graph/test/FetchVerticesTest.cpp | 111 +++- src/meta/SchemaManager.h | 2 + src/meta/ServerBasedSchemaManager.cpp | 5 + src/meta/ServerBasedSchemaManager.h | 2 + src/meta/client/MetaClient.cpp | 30 +- src/meta/client/MetaClient.h | 11 +- src/parser/Clauses.cpp | 13 + src/parser/Clauses.h | 23 + src/parser/TraverseSentences.cpp | 2 +- src/parser/TraverseSentences.h | 29 +- src/parser/parser.yy | 36 +- src/parser/test/ParserTest.cpp | 59 +- src/storage/test/AdHocSchemaManager.h | 5 + 18 files changed, 836 insertions(+), 362 deletions(-) diff --git a/.gitignore b/.gitignore index ecdc0b4df3d..21d4569e78d 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,6 @@ pids/ cmake-build-debug/ cmake-build-release/ .vscode/ +cmake-build-debug* +cmake-build-release* + diff --git a/src/graph/CloudAuthenticator.cpp b/src/graph/CloudAuthenticator.cpp index 5464bcc4a1b..bbbfb9b7b90 100644 --- a/src/graph/CloudAuthenticator.cpp +++ b/src/graph/CloudAuthenticator.cpp @@ -27,13 +27,12 @@ bool CloudAuthenticator::auth(const std::string& user, const std::string& passwo } // Second, use user + password authentication methods - StatusOr result; std::string userAndPasswd = user + ":" + password; std::string base64Str = encryption::Base64::encode(userAndPasswd); std::string header = "-H \"Content-Type: application/json\" -H \"Authorization:Nebula "; header = header + base64Str + "\""; - result = http::HttpClient::post(FLAGS_cloud_http_url, header); + auto result = http::HttpClient::post(FLAGS_cloud_http_url, header); if (!result.ok()) { LOG(ERROR) << result.status(); diff --git a/src/graph/FetchVerticesExecutor.cpp b/src/graph/FetchVerticesExecutor.cpp index 4a9fe846f77..4aac10bb6b4 100644 --- a/src/graph/FetchVerticesExecutor.cpp +++ b/src/graph/FetchVerticesExecutor.cpp @@ -11,8 +11,9 @@ namespace nebula { namespace graph { + FetchVerticesExecutor::FetchVerticesExecutor(Sentence *sentence, ExecutionContext *ectx) - : FetchExecutor(ectx, "fetch_vertices") { + : TraverseExecutor(ectx, "fetch_vertices") { sentence_ = static_cast(sentence); } @@ -20,70 +21,24 @@ Status FetchVerticesExecutor::prepare() { return Status::OK(); } -Status FetchVerticesExecutor::prepareClauses() { - Status status = Status::OK(); - - do { - status = checkIfGraphSpaceChosen(); - if (!status.ok()) { - break; - } - - expCtx_ = std::make_unique(); - spaceId_ = ectx()->rctx()->session()->space(); - expCtx_->setStorageClient(ectx()->getStorageClient()); - if (sentence_->isAllTagProps()) { - break; - } - - yieldClause_ = DCHECK_NOTNULL(sentence_)->yieldClause(); - labelName_ = sentence_->tag(); - auto result = ectx()->schemaManager()->toTagID(spaceId_, *labelName_); - if (!result.ok()) { - LOG(ERROR) << "Get Tag Id failed: " << result.status(); - status = result.status(); - break; - } - tagId_ = result.value(); - labelSchema_ = ectx()->schemaManager()->getTagSchema(spaceId_, tagId_); - if (labelSchema_ == nullptr) { - LOG(ERROR) << *labelName_ << " tag schema not exist."; - status = Status::Error("%s tag schema not exist.", labelName_->c_str()); - break; - } - - status = prepareVids(); - if (!status.ok()) { - LOG(ERROR) << "Prepare vertex id failed: " << status; - break; - } - - // Add VertexID before prepareYield() - returnColNames_.emplace_back("VertexID"); - status = prepareYield(); - if (!status.ok()) { - LOG(ERROR) << "Prepare yield failed: " << status; - break; - } - status = checkTagProps(); - if (!status.ok()) { - LOG(ERROR) << "Check props failed: " << status; - break; - } - } while (false); - return status; -} - Status FetchVerticesExecutor::prepareVids() { + Status status = Status::OK(); if (sentence_->isRef()) { + fromType_ = kRef; auto *expr = sentence_->ref(); if (expr->isInputExpression()) { auto *iexpr = static_cast(expr); colname_ = iexpr->prop(); + inputs_p_ = inputs_.get(); } else if (expr->isVariableExpression()) { auto *vexpr = static_cast(expr); - varname_ = vexpr->alias(); + auto varname = vexpr->alias(); colname_ = vexpr->prop(); + bool existing = false; + inputs_p_ = ectx()->variableHolder()->get(*varname, &existing); + if (!existing) { + return Status::Error("Variable `%s' not defined", varname->c_str()); + } } else { // should never come to here. // only support input and variable yet. @@ -93,35 +48,280 @@ Status FetchVerticesExecutor::prepareVids() { if (colname_ != nullptr && *colname_ == "*") { return Status::Error("Cant not use `*' to reference a vertex id column."); } + if (inputs_p_ == nullptr || !inputs_p_->hasData()) { + return Status::OK(); + } + status = checkIfDuplicateColumn(); + if (!status.ok()) { + return status; + } + auto vidsStatus = inputs_p_->getDistinctVIDs(*colname_); + if (!vidsStatus.ok()) { + return std::move(vidsStatus).status(); + } + vids_ = std::move(vidsStatus).value(); + return Status::OK(); + } else { + fromType_ = kInstantExpr; + std::unordered_set uniqID; + for (auto *expr : sentence_->vidList()) { + expr->setContext(expCtx_.get()); + status = expr->prepare(); + if (!status.ok()) { + break; + } + Getters getters; + auto value = expr->eval(getters); + if (!value.ok()) { + return value.status(); + } + auto v = value.value(); + if (!Expression::isInt(v)) { + status = Status::Error("Vertex ID should be of type integer"); + break; + } + auto valInt = Expression::asInt(v); + if (distinct_) { + auto result = uniqID.emplace(valInt); + if (result.second) { + vids_.emplace_back(valInt); + } + } else { + vids_.emplace_back(valInt); + } + } } - return Status::OK(); + return status; } -Status FetchVerticesExecutor::checkTagProps() { - auto aliasProps = expCtx_->aliasProps(); - for (auto &pair : aliasProps) { - if (pair.first != *labelName_) { +Status FetchVerticesExecutor::prepareTags() { + Status status = Status::OK(); + auto* tags = sentence_->tags(); + if (tags == nullptr) { + LOG(ERROR) << "tags shall never be null"; + return Status::Error("tags shall never be null"); + } + + auto tagNames = tags->labels(); + if (tagNames.empty()) { + LOG(ERROR) << "tags shall never be empty"; + return Status::Error("tags shall never be empty"); + } + + if (tagNames.size() == 1 && *tagNames[0] == "*") { + auto tagsStatus = ectx()->schemaManager()->getAllTag(spaceId_); + if (!tagsStatus.ok()) { + return tagsStatus.status(); + } + for (auto& tagName : std::move(tagsStatus).value()) { + auto tagIdStatus = ectx()->schemaManager()->toTagID(spaceId_, tagName); + if (!tagIdStatus.ok()) { + return tagIdStatus.status(); + } + auto tagId = tagIdStatus.value(); + tagNames_.push_back(tagName); + tagIds_.push_back(tagId); + auto result = tagNameSet_.emplace(tagName); + if (!result.second) { + return Status::Error(folly::sformat("tag({}) was dup", tagName)); + } + } + } else { + for (auto tagName : tagNames) { + auto tagStatus = ectx()->schemaManager()->toTagID(spaceId_, *tagName); + if (!tagStatus.ok()) { + return tagStatus.status(); + } + auto tagId = tagStatus.value(); + tagNames_.push_back(*tagName); + tagIds_.push_back(tagId); + auto result = tagNameSet_.emplace(*tagName); + if (!result.second) { + return Status::Error(folly::sformat("tag({}) was dup", *tagName)); + } + } + } + + return status; +} + +Status FetchVerticesExecutor::prepareYield() { + { + auto *column = new YieldColumn( + new InputPropertyExpression(new std::string("VertexID")), + new std::string("VertexID") + ); + yieldColsHolder_.addColumn(column); + yields_.emplace_back(column); + colNames_.emplace_back("VertexID"); + colTypes_.emplace_back(nebula::cpp2::SupportedType::VID); + } + if (yieldClause_ == nullptr) { + // determine which columns to return after received response from storage. + for (unsigned i = 0; i < tagNames_.size(); i++) { + auto& tagName = tagNames_[i]; + auto tagId = tagIds_[i]; + std::shared_ptr tagSchema = + ectx()->schemaManager()->getTagSchema(spaceId_, tagId); + if (tagSchema == nullptr) { + return Status::Error("No tag schema for %s", tagName.c_str()); + } + for (auto iter = tagSchema->begin(); iter != tagSchema->end(); ++iter) { + auto *prop = iter->getName(); + storage::cpp2::PropDef pd; + pd.owner = storage::cpp2::PropOwner::SOURCE; + pd.name = prop; + pd.id.set_tag_id(tagId); + props_.emplace_back(std::move(pd)); + } + } + } else { + for (auto *col : yieldClause_->columns()) { + if (!col->getFunName().empty()) { + return Status::SyntaxError("Do not support aggregated query with fetch prop on."); + } + + if (col->expr()->isInputExpression()) { + auto *inputExpr = static_cast(col->expr()); + auto *colName = inputExpr->prop(); + if (*colName == "*") { + auto colNames = inputs_p_->getColNames(); + for (auto &prop : colNames) { + Expression *expr = new InputPropertyExpression(new std::string(prop)); + auto *column = new YieldColumn(expr); + yieldColsHolder_.addColumn(column); + yields_.emplace_back(column); + colNames_.emplace_back(column->toString()); + colTypes_.emplace_back(nebula::cpp2::SupportedType::UNKNOWN); + expCtx_->addInputProp(prop); + } + continue; + } + } else if (col->expr()->isVariableExpression()) { + auto *variableExpr = static_cast(col->expr()); + auto *colName = variableExpr->prop(); + if (*colName == "*") { + auto colNames = inputs_p_->getColNames(); + for (auto &prop : colNames) { + auto *alias = new std::string(*(variableExpr->alias())); + Expression *expr = + new VariablePropertyExpression(alias, new std::string(prop)); + auto *column = new YieldColumn(expr); + yieldColsHolder_.addColumn(column); + yields_.emplace_back(column); + colNames_.emplace_back(column->toString()); + colTypes_.emplace_back(nebula::cpp2::SupportedType::UNKNOWN); + expCtx_->addInputProp(prop); + } + continue; + } + } + + yields_.emplace_back(col); + col->expr()->setContext(expCtx_.get()); + Status status = col->expr()->prepare(); + if (!status.ok()) { + return status; + } + if (col->alias() == nullptr) { + colNames_.emplace_back(col->expr()->toString()); + } else { + colNames_.emplace_back(*col->alias()); + } + auto type = calculateExprType(col->expr()); + colTypes_.emplace_back(type); + VLOG(1) << "type: " << static_cast(colTypes_.back()); + } + if (expCtx_->hasSrcTagProp() || expCtx_->hasDstTagProp()) { return Status::SyntaxError( - "Near [%s.%s], tag should be declared in `ON' clause first.", - pair.first.c_str(), pair.second.c_str()); + "tag.prop and edgetype.prop are supported in fetch sentence."); } - if (labelSchema_->getFieldIndex(pair.second) == -1) { - return Status::Error("`%s' is not a prop of `%s'", - pair.second.c_str(), pair.first.c_str()); + auto aliasProps = expCtx_->aliasProps(); + for (auto &pair : aliasProps) { + auto& tagName = pair.first; + auto& prop = pair.second; + if (tagNameSet_.find(tagName) == tagNameSet_.end()) { + return Status::SyntaxError( + "Near [%s.%s], tag should be declared in `ON' clause first.", + tagName.c_str(), prop.c_str()); + } + auto tagStatus = ectx()->schemaManager()->toTagID(spaceId_, tagName); + if (!tagStatus.ok()) { + return tagStatus.status(); + } + auto tagId = tagStatus.value(); + + std::shared_ptr tagSchema = + ectx()->schemaManager()->getTagSchema(spaceId_, tagId); + if (tagSchema == nullptr) { + return Status::Error("No tag schema for %s", tagName.c_str()); + } + if (tagSchema->getFieldIndex(prop) == -1) { + return Status::Error( + "`%s' is not a prop of `%s'", tagName.c_str(), prop.c_str()); + } + storage::cpp2::PropDef pd; + pd.owner = storage::cpp2::PropOwner::SOURCE; + pd.name = prop; + pd.id.set_tag_id(tagId); + props_.emplace_back(std::move(pd)); } } return Status::OK(); } -void FetchVerticesExecutor::execute() { - auto status = prepareClauses(); +Status FetchVerticesExecutor::prepareClauses() { + DCHECK(sentence_ != nullptr); + spaceId_ = ectx()->rctx()->session()->space(); + expCtx_ = std::make_unique(); + expCtx_->setStorageClient(ectx()->getStorageClient()); + expCtx_->setSpace(spaceId_); + + Status status; + do { + status = checkIfGraphSpaceChosen(); + if (!status.ok()) { + break; + } + yieldClause_ = sentence_->yieldClause(); + if (yieldClause_ != nullptr) { + distinct_ = yieldClause_->isDistinct(); + } + status = prepareVids(); + if (!status.ok()) { + break; + } + status = prepareTags(); + if (!status.ok()) { + break; + } + status = prepareYield(); + if (!status.ok()) { + break; + } + } while (0); + if (!status.ok()) { - doError(std::move(status)); - return; + LOG(ERROR) << "Preparing failed: " << status; + return status; + } + return status; +} + +void FetchVerticesExecutor::onEmptyInputs() { + if (onResult_) { + auto outputs = std::make_unique(std::move(colNames_)); + onResult_(std::move(outputs)); + } else if (resp_ == nullptr) { + resp_ = std::make_unique(); + resp_->set_column_names(std::move(colNames_)); } + doFinish(Executor::ProcessControl::kNext); +} - status = setupVids(); +void FetchVerticesExecutor::execute() { + auto status = prepareClauses(); if (!status.ok()) { doError(std::move(status)); return; @@ -131,22 +331,17 @@ void FetchVerticesExecutor::execute() { onEmptyInputs(); return; } - fetchVertices(); } void FetchVerticesExecutor::fetchVertices() { - std::vector props; - if (!sentence_->isAllTagProps()) { - props = getPropNames(); - } - - auto future = ectx()->getStorageClient()->getVertexProps(spaceId_, vids_, std::move(props)); + auto future = ectx()->getStorageClient()->getVertexProps( + spaceId_, vids_, std::move(props_)); auto *runner = ectx()->rctx()->runner(); auto cb = [this] (RpcResponse &&result) mutable { auto completeness = result.completeness(); if (completeness == 0) { - doError(Status::Error("Get tag `%s' props failed", sentence_->tag()->c_str())); + doError(Status::Error("Get tag props failed")); return; } else if (completeness != 100) { LOG(INFO) << "Get vertices partially failed: " << completeness << "%"; @@ -155,277 +350,293 @@ void FetchVerticesExecutor::fetchVertices() { << "error code: " << static_cast(error.second); } } - if (!sentence_->isAllTagProps()) { - processResult(std::move(result)); - } else { - processAllPropsResult(std::move(result)); - } + processResult(std::move(result)); return; }; auto error = [this] (auto &&e) { - auto msg = folly::stringPrintf("Get tag `%s' props exception: %s.", - sentence_->tag()->c_str(), e.what().c_str()); + auto msg = folly::stringPrintf("Get tag props exception: %s.", e.what().c_str()); LOG(ERROR) << msg; doError(Status::Error(std::move(msg))); }; std::move(future).via(runner).thenValue(cb).thenError(error); } -std::vector FetchVerticesExecutor::getPropNames() { - std::vector props; - for (auto &prop : expCtx_->aliasProps()) { - storage::cpp2::PropDef pd; - pd.owner = storage::cpp2::PropOwner::SOURCE; - pd.name = prop.second; - pd.id.set_tag_id(tagId_); - props.emplace_back(std::move(pd)); - } - - return props; -} - void FetchVerticesExecutor::processResult(RpcResponse &&result) { auto all = result.responses(); std::shared_ptr outputSchema; std::unique_ptr rsWriter; - Getters getters; + size_t num = 0; for (auto &resp : all) { - if (!resp.__isset.vertices) { + num += resp.vertices.size(); + } + if (num == 0) { + finishExecution(std::move(rsWriter)); + return; + } + std::unordered_map>> dataMap; + dataMap.reserve(num); + + std::unordered_map> tagSchemaMap; + std::set tagIdSet; + for (auto &resp : all) { + if (!resp.__isset.vertices || resp.vertices.empty()) { continue; } + auto *vertexSchema = resp.get_vertex_schema(); + if (vertexSchema != nullptr) { + std::transform(vertexSchema->cbegin(), vertexSchema->cend(), + std::inserter(tagSchemaMap, tagSchemaMap.begin()), [](auto &s) { + return std::make_pair( + s.first, std::make_shared(s.second)); + }); + } - if (resultColNames_.empty()) { - for (auto &vdata : resp.vertices) { - if (outputSchema == nullptr) { - outputSchema = std::make_shared(); - outputSchema->appendCol("VertexID", nebula::cpp2::SupportedType::VID); - rsWriter = std::make_unique(outputSchema); - } - auto writer = std::make_unique(outputSchema); - (*writer) << vdata.vertex_id; - std::string encode = writer->encode(); - rsWriter->addRow(std::move(encode)); - } - } else { - auto *schema = resp.get_vertex_schema(); - if (schema == nullptr) { + for (auto &vdata : resp.vertices) { + if (!vdata.__isset.tag_data || vdata.tag_data.empty()) { continue; } - - std::unordered_map> tagSchema; - std::transform(schema->cbegin(), schema->cend(), - std::inserter(tagSchema, tagSchema.begin()), [](auto &s) { - return std::make_pair( - s.first, std::make_shared(s.second)); - }); - - for (auto &vdata : resp.vertices) { - std::unique_ptr vreader; - if (!vdata.__isset.tag_data || vdata.tag_data.empty()) { - continue; - } - - auto vschema = tagSchema[vdata.tag_data[0].tag_id]; - vreader = RowReader::getRowReader(vdata.tag_data[0].data, vschema); - if (outputSchema == nullptr) { - outputSchema = std::make_shared(); - outputSchema->appendCol("VertexID", nebula::cpp2::SupportedType::VID); - auto status = getOutputSchema(vschema.get(), vreader.get(), outputSchema.get()); - if (!status.ok()) { - LOG(ERROR) << "Get output schema failed: " << status; - doError(Status::Error("Get output schema failed: %s.", - status.toString().c_str())); + for (auto& tagData : vdata.tag_data) { + auto& data = tagData.data; + VertexID vid = vdata.vertex_id; + TagID tagId = tagData.tag_id; + if (tagSchemaMap.find(tagId) == tagSchemaMap.end()) { + auto ver = RowReader::getSchemaVer(data); + if (ver < 0) { + LOG(ERROR) << "Found schema version negative " << ver; + doError(Status::Error("Found schema version negative: %d", ver)); return; } - rsWriter = std::make_unique(outputSchema); - } - - auto writer = std::make_unique(outputSchema); - (*writer) << vdata.vertex_id; - getters.getAliasProp = - [&vreader, &vschema] (const std::string&, - const std::string &prop) -> OptVariantType { - return Collector::getProp(vschema.get(), prop, vreader.get()); - }; - for (auto *column : yields_) { - auto *expr = column->expr(); - auto value = expr->eval(getters); - if (!value.ok()) { - doError(std::move(value).status()); - return; - } - auto status = Collector::collect(value.value(), writer.get()); - if (!status.ok()) { - LOG(ERROR) << "Collect prop error: " << status; - doError(std::move(status)); - return; + auto schema = ectx()->schemaManager()->getTagSchema(spaceId_, tagId, ver); + if (schema == nullptr) { + VLOG(3) << "Schema not found for tag id: " << tagId; + // Ignore the bad data. + continue; } + tagSchemaMap[tagId] = schema; } - // TODO Consider float/double, and need to reduce mem copy. - std::string encode = writer->encode(); - rsWriter->addRow(std::move(encode)); - } // for `vdata' + auto vschema = tagSchemaMap[tagId]; + auto vreader = RowReader::getRowReader(data, vschema); + dataMap[vid][tagId] = std::move(vreader); + tagIdSet.insert(tagId); + } } - } // for `resp' - - finishExecution(std::move(rsWriter)); -} - -Status FetchVerticesExecutor::setupVids() { - Status status = Status::OK(); - if (sentence_->isRef() && !sentence_->isAllTagProps()) { - status = setupVidsFromRef(); - } else { - status = setupVidsFromExpr(); } - return status; -} - -Status FetchVerticesExecutor::setupVidsFromExpr() { - Status status = Status::OK(); - std::unique_ptr> uniqID; - if (distinct_) { - uniqID = std::make_unique>(); + if (yieldClause_ == nullptr) { + for (TagID tagId : tagIdSet) { + auto tagSchema = tagSchemaMap[tagId]; + auto tagFound = ectx()->schemaManager()->toTagName(spaceId_, tagId); + if (!tagFound.ok()) { + VLOG(3) << "Tag name not found for tag id: " << tagId; + // Ignore the bad data. + continue; + } + auto tagName = std::move(tagFound).value(); + + for (auto iter = tagSchema->begin(); iter != tagSchema->end(); ++iter) { + auto *ref = new std::string(""); + auto *alias = new std::string(tagName); + auto *prop = iter->getName(); + Expression *expr = + new AliasPropertyExpression(ref, alias, new std::string(prop)); + auto *column = new YieldColumn(expr); + yieldColsHolder_.addColumn(column); + yields_.emplace_back(column); + colNames_.emplace_back(expr->toString()); + colTypes_.emplace_back(nebula::cpp2::SupportedType::UNKNOWN); + } + } } - expCtx_->setSpace(spaceId_); - auto vidList = sentence_->vidList(); - Getters getters; - for (auto *expr : vidList) { - expr->setContext(expCtx_.get()); - status = expr->prepare(); - if (!status.ok()) { - break; - } - auto value = expr->eval(getters); - if (!value.ok()) { - return value.status(); - } - auto v = value.value(); - if (!Expression::isInt(v)) { - status = Status::Error("Vertex ID should be of type integer"); - break; + if (fromType_ == kRef) { + if (inputs_p_ == nullptr) { + LOG(ERROR) << "inputs is nullptr."; + doError(Status::Error("inputs is nullptr.")); + return; } - auto valInt = Expression::asInt(v); - if (distinct_) { - auto result = uniqID->emplace(valInt); - if (result.second) { - vids_.emplace_back(valInt); + auto visitor = [&, this] (const RowReader *reader) -> Status { + VertexID vid = 0; + auto rc = reader->getVid(*colname_, vid); + if (rc != ResultType::SUCCEEDED) { + return Status::Error("Column `%s' not found", colname_->c_str()); } - } else { - vids_.emplace_back(valInt); - } - } + if (dataMap.find(vid) == dataMap.end() && !expCtx_->hasInputProp()) { + return Status::OK(); + } + auto& ds = dataMap[vid]; + + std::vector record; + auto schema = reader->getSchema().get(); + Getters getters; + getters.getVariableProp = [&] (const std::string &prop) -> OptVariantType { + if (prop == "VertexID") { + return OptVariantType(vid); + } + return Collector::getProp(schema, prop, reader); + }; + getters.getInputProp = [&] (const std::string &prop) -> OptVariantType { + if (prop == "VertexID") { + return OptVariantType(vid); + } + return Collector::getProp(schema, prop, reader); + }; + getters.getAliasProp = [&] (const std::string& tagName, const std::string &prop) -> OptVariantType { + auto tagIdStatus = ectx()->schemaManager()->toTagID(spaceId_, tagName); + if (!tagIdStatus.ok()) { + return tagIdStatus.status(); + } + TagID tagId = std::move(tagIdStatus).value(); + if (ds.find(tagId) != ds.end()) { + auto vreader = ds[tagId].get(); + auto vschema = vreader->getSchema().get(); + return Collector::getProp(vschema, prop, vreader); + } else { + auto ts = ectx()->schemaManager()->getTagSchema(spaceId_, tagId); + if (ts == nullptr) { + return Status::Error("No tag schema for %s", tagName.c_str()); + } + return RowReader::getDefaultProp(ts.get(), prop); + } + }; - return status; -} + for (auto *column : yields_) { + auto *expr = column->expr(); + auto value = expr->eval(getters); + if (!value.ok()) { + return value.status(); + } + record.emplace_back(std::move(value).value()); + } + if (outputSchema == nullptr) { + outputSchema = std::make_shared(); + rsWriter = std::make_unique(outputSchema); + auto getSchemaStatus = Collector::getSchema(record, colNames_, colTypes_, outputSchema.get()); + if (!getSchemaStatus.ok()) { + return getSchemaStatus; + } + } + auto writer = std::make_unique(outputSchema); + for (auto& value : record) { + auto status = Collector::collect(value, writer.get()); + if (!status.ok()) { + return status; + } + } + rsWriter->addRow(*writer); + return Status::OK(); + }; -Status FetchVerticesExecutor::setupVidsFromRef() { - const InterimResult *inputs; - if (varname_ == nullptr) { - inputs = inputs_.get(); - } else { - bool existing = false; - inputs = ectx()->variableHolder()->get(*varname_, &existing); - if (!existing) { - return Status::Error("Variable `%s' not defined", varname_->c_str()); + Status status = inputs_p_->applyTo(visitor); + if (!status.ok()) { + LOG(ERROR) << "inputs visit failed. " << status.toString(); + doError(status); + return; } - } - if (inputs == nullptr || !inputs->hasData()) { - return Status::OK(); - } - - auto status = checkIfDuplicateColumn(); - if (!status.ok()) { - return status; - } - StatusOr> result; - if (distinct_) { - result = inputs->getDistinctVIDs(*colname_); } else { - result = inputs->getVIDs(*colname_); - } - if (!result.ok()) { - return std::move(result).status(); - } - vids_ = std::move(result).value(); - return Status::OK(); -} - -void FetchVerticesExecutor::processAllPropsResult(RpcResponse &&result) { - auto &all = result.responses(); - std::unique_ptr rsWriter; - std::shared_ptr outputSchema; - for (auto &resp : all) { - if (!resp.__isset.vertices) { - continue; - } - - for (auto &vdata : resp.vertices) { - if (!vdata.__isset.tag_data || vdata.tag_data.empty()) { + for (auto vid : vids_) { + auto iter = dataMap.find(vid); + if (iter == dataMap.end()) { continue; } - RowWriter writer; - writer << RowWriter::ColType(nebula::cpp2::SupportedType::VID) << vdata.vertex_id; - for (auto &tdata : vdata.tag_data) { - auto ver = RowReader::getSchemaVer(tdata.data); - if (ver < 0) { - LOG(ERROR) << "Found schema version negative " << ver; - doError(Status::Error("Found schema version negative: %d", ver)); - return; + if (dataMap.find(vid) == dataMap.end() && !expCtx_->hasInputProp()) { + continue; + } + auto& ds = dataMap[vid]; + std::vector record; + Getters getters; + getters.getInputProp = [&] (const std::string &prop) -> OptVariantType { + if (prop == "VertexID") { + return OptVariantType(vid); } - auto schema = ectx()->schemaManager()->getTagSchema(spaceId_, tdata.tag_id, ver); - if (schema == nullptr) { - VLOG(3) << "Schema not found for tag id: " << tdata.tag_id; - // Ignore the bad data. - continue; + std::string errMsg = + folly::stringPrintf("Unknown input prop: %s", prop.c_str()); + return OptVariantType(Status::Error(errMsg)); + }; + getters.getAliasProp = [&] (const std::string& tagName, const std::string &prop) -> OptVariantType { + auto tagIdStatus = ectx()->schemaManager()->toTagID(spaceId_, tagName); + if (!tagIdStatus.ok()) { + return tagIdStatus.status(); } - if (rsWriter == nullptr) { - outputSchema = std::make_shared(); - outputSchema->appendCol("VertexID", nebula::cpp2::SupportedType::VID); - returnColNames_.emplace_back("VertexID"); - rsWriter = std::make_unique(outputSchema); + TagID tagId = std::move(tagIdStatus).value(); + if (ds.find(tagId) != ds.end()) { + auto vreader = ds[tagId].get(); + auto vschema = vreader->getSchema().get(); + return Collector::getProp(vschema, prop, vreader); + } else { + auto ts = ectx()->schemaManager()->getTagSchema(spaceId_, tagId); + if (ts == nullptr) { + return Status::Error("No tag schema for %s", tagName.c_str()); + } + return RowReader::getDefaultProp(ts.get(), prop); } - // row.append(tdata.data); - auto reader = RowReader::getRowReader(tdata.data, schema); + }; - auto tagFound = ectx()->schemaManager()->toTagName(spaceId_, tdata.tag_id); - if (!tagFound.ok()) { - VLOG(3) << "Tag name not found for tag id: " << tdata.tag_id; - // Ignore the bad data. - continue; + for (auto *column : yields_) { + auto *expr = column->expr(); + auto value = expr->eval(getters); + if (!value.ok()) { + doError(value.status()); + return; } - auto tagName = std::move(tagFound).value(); - auto iter = schema->begin(); - auto index = 0; - - while (iter) { - auto *field = iter->getName(); - auto prop = RowReader::getPropByIndex(reader.get(), index); - if (!ok(prop)) { - LOG(ERROR) << "Read props of tag " << tagName << " failed."; - doError(Status::Error("Read props of tag `%s' failed.", tagName.c_str())); - return; - } - Collector::collectWithoutSchema(value(prop), &writer); - auto colName = folly::stringPrintf("%s.%s", tagName.c_str(), field); - resultColNames_.emplace_back(colName); - returnColNames_.emplace_back(colName); - auto fieldType = iter->getType(); - outputSchema->appendCol(std::move(colName), std::move(fieldType)); - ++index; - ++iter; + record.emplace_back(std::move(value).value()); + } + if (outputSchema == nullptr) { + outputSchema = std::make_shared(); + rsWriter = std::make_unique(outputSchema); + auto getSchemaStatus = Collector::getSchema(record, colNames_, colTypes_, outputSchema.get()); + if (!getSchemaStatus.ok()) { + doError(getSchemaStatus); + return; } } - if (writer.size() > 1 && rsWriter != nullptr) { - rsWriter->addRow(writer.encode()); + auto writer = std::make_unique(outputSchema); + for (auto& value : record) { + auto status = Collector::collect(value, writer.get()); + if (!status.ok()) { + doError(status); + return; + } } + rsWriter->addRow(*writer); } } finishExecution(std::move(rsWriter)); } + +void FetchVerticesExecutor::setupResponse(cpp2::ExecutionResponse &resp) { + if (resp_ == nullptr) { + resp_ = std::make_unique(); + resp_->set_column_names(std::move(colNames_)); + } + resp = std::move(*resp_); +} + +void FetchVerticesExecutor::finishExecution(std::unique_ptr rsWriter) { + auto outputs = std::make_unique(std::move(colNames_)); + if (rsWriter != nullptr) { + outputs->setInterim(std::move(rsWriter)); + } + + if (onResult_) { + onResult_(std::move(outputs)); + } else { + resp_ = std::make_unique(); + auto colNames = outputs->getColNames(); + resp_->set_column_names(std::move(colNames)); + if (outputs->hasData()) { + auto ret = outputs->getRows(); + if (!ret.ok()) { + LOG(ERROR) << "Get rows failed: " << ret.status(); + doError(std::move(ret).status()); + return; + } + resp_->set_rows(std::move(ret).value()); + } + } + doFinish(Executor::ProcessControl::kNext); +} + } // namespace graph } // namespace nebula diff --git a/src/graph/FetchVerticesExecutor.h b/src/graph/FetchVerticesExecutor.h index c92ec5dddfe..e7bd19fcf9a 100644 --- a/src/graph/FetchVerticesExecutor.h +++ b/src/graph/FetchVerticesExecutor.h @@ -8,14 +8,14 @@ #define GRAPH_FETCHVERTICESEXECUTOR_H_ #include "base/Base.h" -#include "graph/FetchExecutor.h" +#include "graph/TraverseExecutor.h" #include "storage/client/StorageClient.h" #include "meta/SchemaProviderIf.h" #include "dataman/SchemaWriter.h" namespace nebula { namespace graph { -class FetchVerticesExecutor final : public FetchExecutor { +class FetchVerticesExecutor final : public TraverseExecutor { public: FetchVerticesExecutor(Sentence *sentence, ExecutionContext *ectx); @@ -27,34 +27,49 @@ class FetchVerticesExecutor final : public FetchExecutor { void execute() override; -private: - Status prepareClauses(); - - Status prepareVids(); - - Status checkTagProps(); + void setupResponse(cpp2::ExecutionResponse &resp) override; - Status setupVids(); +private: + Status prepareTags(); - Status setupVidsFromRef(); + Status prepareYield(); - Status setupVidsFromExpr(); + Status prepareClauses(); - std::vector getPropNames(); + Status prepareVids(); void fetchVertices(); + void onEmptyInputs(); + using RpcResponse = storage::StorageRpcResponse; void processResult(RpcResponse &&result); - void processAllPropsResult(RpcResponse &&result); + void finishExecution(std::unique_ptr rsWriter); + enum FromType { + kInstantExpr, + kRef, + }; private: + GraphSpaceID spaceId_{-1}; + FromType fromType_{kInstantExpr}; + YieldClause *yieldClause_{nullptr}; + std::unique_ptr resp_; + std::unique_ptr expCtx_; + std::vector yields_; + YieldColumns yieldColsHolder_; + bool distinct_{false}; FetchVerticesSentence *sentence_{nullptr}; + std::vector colNames_; + std::vector colTypes_; std::vector vids_; - TagID tagId_{INT_MIN}; - std::string *varname_{nullptr}; - std::string *colname_{nullptr}; + std::vector tagNames_; + std::unordered_set tagNameSet_; + std::vector tagIds_; + std::vector props_; + const InterimResult* inputs_p_{nullptr}; + std::string* colname_{nullptr}; }; } // namespace graph } // namespace nebula diff --git a/src/graph/TraverseExecutor.cpp b/src/graph/TraverseExecutor.cpp index 0dd473a9912..40bc103114a 100644 --- a/src/graph/TraverseExecutor.cpp +++ b/src/graph/TraverseExecutor.cpp @@ -281,7 +281,9 @@ OptVariantType Collector::getProp(const meta::SchemaProviderIf *schema, } default: std::string errMsg = - folly::stringPrintf("Unknown type: %d", static_cast(type)); + folly::stringPrintf( + "Unknown type: %d, prop: %s", + static_cast(type), prop.c_str()); LOG(ERROR) << errMsg; return Status::Error(errMsg); } diff --git a/src/graph/test/FetchVerticesTest.cpp b/src/graph/test/FetchVerticesTest.cpp index 80e9026df74..15d759731b9 100644 --- a/src/graph/test/FetchVerticesTest.cpp +++ b/src/graph/test/FetchVerticesTest.cpp @@ -83,6 +83,20 @@ TEST_F(FetchVerticesTest, Base) { }; ASSERT_TRUE(verifyResult(resp, expected)); } + { + cpp2::ExecutionResponse resp; + auto &player = players_["Boris Diaw"]; + auto *fmt = "GO FROM %ld over like YIELD like._dst as id" + "| FETCH PROP ON player $-.id YIELD player.name, player.age, $-.*"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + + std::vector expectedColNames{ + {"VertexID"}, {"player.name"}, {"player.age"}, {"$-.id"} + }; + ASSERT_TRUE(verifyColNames(resp, expectedColNames)); + } { cpp2::ExecutionResponse resp; auto &player = players_["Boris Diaw"]; @@ -277,13 +291,6 @@ TEST_F(FetchVerticesTest, SyntaxError) { auto code = client_->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::E_SYNTAX_ERROR, code); } - { - cpp2::ExecutionResponse resp; - auto query = "GO FROM 11 over like YIELD like._dst as id " - "| FETCH PROP ON player 11 YIELD $-.id"; - auto code = client_->execute(query, resp); - ASSERT_EQ(cpp2::ErrorCode::E_SYNTAX_ERROR, code); - } } TEST_F(FetchVerticesTest, NonexistentTag) { @@ -352,6 +359,39 @@ TEST_F(FetchVerticesTest, FetchAll) { }; ASSERT_TRUE(verifyResult(resp, expected)); } + { + cpp2::ExecutionResponse resp; + auto &player = players_["Boris Diaw"]; + auto *fmt = "YIELD %ld as id | FETCH PROP ON * $-.id "; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector expectedColNames{ + {"VertexID"}, {"player.name"}, {"player.age"} + }; + ASSERT_TRUE(verifyColNames(resp, expectedColNames)); + std::vector> expected = { + {player.vid(), player.name(), player.age()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + { + cpp2::ExecutionResponse resp; + auto &player = players_["Boris Diaw"]; + auto *fmt = "FETCH PROP ON * %ld, %ld "; + auto query = folly::stringPrintf(fmt, player.vid(), player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector expectedColNames{ + {"VertexID"}, {"player.name"}, {"player.age"} + }; + ASSERT_TRUE(verifyColNames(resp, expectedColNames)); + std::vector> expected = { + {player.vid(), player.name(), player.age()}, + {player.vid(), player.name(), player.age()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } { cpp2::ExecutionResponse resp; auto &player = players_["Tim Duncan"]; @@ -387,6 +427,63 @@ TEST_F(FetchVerticesTest, FetchAll) { }; ASSERT_TRUE(verifyResult(resp, expected)); } + { + cpp2::ExecutionResponse resp; + auto &player = players_["Tim Duncan"]; + auto *fmt = "YIELD %ld as id | FETCH PROP ON * $-.id "; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector expectedColNames{ + {"VertexID"}, {"player.name"}, {"player.age"}, + {"bachelor.name"}, {"bachelor.speciality"} + }; + ASSERT_TRUE(verifyColNames(resp, expectedColNames)); + std::vector> expected = { + {player.vid(), player.name(), player.age(), + bachelors_["Tim Duncan"].name(), bachelors_["Tim Duncan"].speciality()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + { + cpp2::ExecutionResponse resp; + auto &player = players_["Tim Duncan"]; + auto *fmt = "FETCH PROP ON * %ld, %ld "; + auto query = folly::stringPrintf(fmt, player.vid(), player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector expectedColNames{ + {"VertexID"}, {"player.name"}, {"player.age"}, + {"bachelor.name"}, {"bachelor.speciality"} + }; + ASSERT_TRUE(verifyColNames(resp, expectedColNames)); + std::vector> expected = { + {player.vid(), player.name(), player.age(), + bachelors_["Tim Duncan"].name(), bachelors_["Tim Duncan"].speciality()}, + {player.vid(), player.name(), player.age(), + bachelors_["Tim Duncan"].name(), bachelors_["Tim Duncan"].speciality()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + { + cpp2::ExecutionResponse resp; + auto &player = players_["Tim Duncan"]; + auto *fmt = "FETCH PROP ON * %ld, %ld YIELD player.name, player.age "; + auto query = folly::stringPrintf(fmt, player.vid(), player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector expectedColNames{ + {"VertexID"}, {"player.name"}, {"player.age"} + }; + ASSERT_TRUE(verifyColNames(resp, expectedColNames)); + std::vector> expected = { + {player.vid(), player.name(), player.age()}, + {player.vid(), player.name(), player.age()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } } TEST_F(FetchVerticesTest, DuplicateColumnName) { diff --git a/src/meta/SchemaManager.h b/src/meta/SchemaManager.h index 8ccfbf6ef15..8a32387d38c 100644 --- a/src/meta/SchemaManager.h +++ b/src/meta/SchemaManager.h @@ -46,6 +46,8 @@ class SchemaManager { virtual StatusOr> getAllEdge(GraphSpaceID space) = 0; + virtual StatusOr> getAllTag(GraphSpaceID space) = 0; + virtual void init(MetaClient *client = nullptr) = 0; protected: diff --git a/src/meta/ServerBasedSchemaManager.cpp b/src/meta/ServerBasedSchemaManager.cpp index 8bd8817589e..5d956cce2b1 100644 --- a/src/meta/ServerBasedSchemaManager.cpp +++ b/src/meta/ServerBasedSchemaManager.cpp @@ -108,5 +108,10 @@ StatusOr> ServerBasedSchemaManager::getAllEdge(GraphSpa return metaClient_->getAllEdgeFromCache(space); } +StatusOr> ServerBasedSchemaManager::getAllTag(GraphSpaceID space) { + CHECK(metaClient_); + return metaClient_->getAllTagFromCache(space); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/ServerBasedSchemaManager.h b/src/meta/ServerBasedSchemaManager.h index 50f8a9b6011..5c37cc43bea 100644 --- a/src/meta/ServerBasedSchemaManager.h +++ b/src/meta/ServerBasedSchemaManager.h @@ -46,6 +46,8 @@ class ServerBasedSchemaManager : public SchemaManager { StatusOr> getAllEdge(GraphSpaceID space) override; + StatusOr> getAllTag(GraphSpaceID space) override; + void init(MetaClient *client) override; private: diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 5b5da0ebe54..8a8d4fd4b9a 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -172,6 +172,7 @@ bool MetaClient::loadData() { decltype(spaceEdgeIndexByType_) spaceEdgeIndexByType; decltype(spaceTagIndexById_) spaceTagIndexById; decltype(spaceAllEdgeMap_) spaceAllEdgeMap; + decltype(spaceAllTagMap_) spaceAllTagMap; for (auto space : ret.value()) { auto spaceId = space.first; @@ -198,7 +199,8 @@ bool MetaClient::loadData() { spaceEdgeIndexByType, spaceNewestTagVerMap, spaceNewestEdgeVerMap, - spaceAllEdgeMap)) { + spaceAllEdgeMap, + spaceAllTagMap)) { LOG(ERROR) << "Load Schemas Failed"; return false; } @@ -225,6 +227,7 @@ bool MetaClient::loadData() { spaceEdgeIndexByType_ = std::move(spaceEdgeIndexByType); spaceTagIndexById_ = std::move(spaceTagIndexById); spaceAllEdgeMap_ = std::move(spaceAllEdgeMap); + spaceAllTagMap_ = std::move(spaceAllTagMap); } localDataLastUpdateTime_.store(metadLastUpdateTime_.load()); diff(oldCache, localCache_); @@ -279,7 +282,8 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, SpaceEdgeTypeNameMap &edgeTypeNameMap, SpaceNewestTagVerMap &newestTagVerMap, SpaceNewestEdgeVerMap &newestEdgeVerMap, - SpaceAllEdgeMap &allEdgeMap) { + SpaceAllEdgeMap &allEdgeMap, + SpaceAllTagMap &allTagMap) { auto tagRet = listTagSchemas(spaceId).get(); if (!tagRet.ok()) { LOG(ERROR) << "Get tag schemas failed for spaceId " << spaceId << ", " << tagRet.status(); @@ -293,6 +297,7 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, } allEdgeMap[spaceId] = {}; + allTagMap[spaceId] = {}; auto tagItemVec = tagRet.value(); auto edgeItemVec = edgeRet.value(); spaceInfoCache->tagItemVec_ = tagItemVec; @@ -300,9 +305,15 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, spaceInfoCache->edgeItemVec_ = edgeItemVec; spaceInfoCache->edgeSchemas_ = __buildEdgeSchemas(edgeItemVec); + std::unordered_set> tags; for (auto& tagIt : tagItemVec) { tagNameIdMap.emplace(std::make_pair(spaceId, tagIt.tag_name), tagIt.tag_id); tagIdNameMap.emplace(std::make_pair(spaceId, tagIt.tag_id), tagIt.tag_name); + if (tags.find({spaceId, tagIt.tag_id}) != tags.cend()) { + continue; + } + tags.emplace(spaceId, tagIt.tag_id); + allTagMap[spaceId].emplace_back(tagIt.tag_name); // get the latest tag version auto it = newestTagVerMap.find(std::make_pair(spaceId, tagIt.tag_id)); if (it != newestTagVerMap.end()) { @@ -415,6 +426,7 @@ const MetaClient::ThreadLocalInfo& MetaClient::getThreadLocalInfo() { threadLocalInfo.spaceEdgeIndexByType_ = spaceEdgeIndexByType_; threadLocalInfo.spaceTagIndexById_ = spaceTagIndexById_; threadLocalInfo.spaceAllEdgeMap_ = spaceAllEdgeMap_; + threadLocalInfo.spaceAllTagMap_ = spaceAllTagMap_; } return threadLocalInfo; @@ -921,6 +933,20 @@ StatusOr> MetaClient::getAllEdgeFromCache(const GraphSp return it->second; } +StatusOr> MetaClient::getAllTagFromCache(const GraphSpaceID& space) { + if (!ready_) { + return Status::Error("Not ready!"); + } +// folly::RWSpinLock::ReadHolder holder(localCacheLock_); + const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + auto it = threadLocalInfo.spaceAllTagMap_.find(space); + if (it == threadLocalInfo.spaceAllTagMap_.end()) { + std::string error = folly::stringPrintf("SpaceId `%d' is nonexistent", space); + return Status::Error(std::move(error)); + } + return it->second; +} + folly::Future> MetaClient::multiPut(std::string segment, std::vector> pairs) { diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 35e24cc0516..3d13842e535 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -82,6 +82,9 @@ using SpaceTagIdNameMap = std::unordered_map, std // get all edgeType edgeName via spaceId using SpaceAllEdgeMap = std::unordered_map>; + +// get all tagId tagName via spaceId +using SpaceAllTagMap = std::unordered_map>; // get leader host via spaceId and partId using LeaderMap = std::unordered_map, HostAddr>; @@ -423,6 +426,9 @@ class MetaClient { // get all lastest version edge StatusOr> getAllEdgeFromCache(const GraphSpaceID& space); + // get all lastest version tag + StatusOr> getAllTagFromCache(const GraphSpaceID& space); + PartsMap getPartsMapFromCache(const HostAddr& host); StatusOr getPartMetaFromCache(GraphSpaceID spaceId, PartitionID partId); @@ -510,7 +516,8 @@ class MetaClient { SpaceEdgeTypeNameMap &edgeTypeNamemap, SpaceNewestTagVerMap &newestTagVerMap, SpaceNewestEdgeVerMap &newestEdgeVerMap, - SpaceAllEdgeMap &allEdgemap); + SpaceAllEdgeMap &allEdgemap, + SpaceAllTagMap &allTagmap); bool loadUsersAndRoles(); @@ -589,6 +596,7 @@ class MetaClient { SpaceNewestTagVerMap spaceNewestTagVerMap_; SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_; SpaceAllEdgeMap spaceAllEdgeMap_; + SpaceAllTagMap spaceAllTagMap_; }; const ThreadLocalInfo& getThreadLocalInfo(); @@ -610,6 +618,7 @@ class MetaClient { SpaceNewestTagVerMap spaceNewestTagVerMap_; SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_; SpaceAllEdgeMap spaceAllEdgeMap_; + SpaceAllTagMap spaceAllTagMap_; UserRolesMap userRolesMap_; UserPasswordMap userPasswordMap_; diff --git a/src/parser/Clauses.cpp b/src/parser/Clauses.cpp index b505d14aaad..6f6f2fd116a 100644 --- a/src/parser/Clauses.cpp +++ b/src/parser/Clauses.cpp @@ -150,6 +150,19 @@ std::string OverEdges::toString() const { return buf; } +std::string FetchLabels::toString() const { + std::string buf; + buf.reserve(256); + for (auto &label : labels_) { + buf += *label; + buf += ","; + } + if (!buf.empty()) { + buf.pop_back(); + } + return buf; +} + std::string OverClause::toString() const { std::string buf; buf.reserve(256); diff --git a/src/parser/Clauses.h b/src/parser/Clauses.h index 1ef221456c1..672e1ddc251 100644 --- a/src/parser/Clauses.h +++ b/src/parser/Clauses.h @@ -259,6 +259,29 @@ class OverEdges final { std::vector> edges_; }; +class FetchLabels final { +public: + void addLabel(std::string *label) { labels_.emplace_back(label); } + + std::vector labels() { + std::vector result; + std::transform(labels_.cbegin(), labels_.cend(), + std::insert_iterator>(result, result.begin()), + [](auto &label) { return label.get(); }); + return result; + } + + std::string toString() const; + + std::string* release() { + auto label = labels_[0].release(); + delete this; + return label; + } +private: + std::vector> labels_; +}; + class OverClause final : public Clause { public: enum class Direction : uint8_t { diff --git a/src/parser/TraverseSentences.cpp b/src/parser/TraverseSentences.cpp index 03a7687bf49..422704ce4fe 100644 --- a/src/parser/TraverseSentences.cpp +++ b/src/parser/TraverseSentences.cpp @@ -133,7 +133,7 @@ std::string FetchVerticesSentence::toString() const { std::string buf; buf.reserve(256); buf += "FETCH PROP ON "; - buf += *tag_; + buf += tags_->toString(); buf += " "; if (isRef()) { buf += vidRef_->toString(); diff --git a/src/parser/TraverseSentences.h b/src/parser/TraverseSentences.h index 335cd4b5a13..e40620f3515 100644 --- a/src/parser/TraverseSentences.h +++ b/src/parser/TraverseSentences.h @@ -291,37 +291,26 @@ class OrderBySentence final : public Sentence { class FetchVerticesSentence final : public Sentence { public: - FetchVerticesSentence(std::string *tag, + FetchVerticesSentence(FetchLabels *tags, VertexIDList *vidList, YieldClause *clause) { kind_ = Kind::kFetchVertices; - tag_.reset(tag); + tags_.reset(tags); vidList_.reset(vidList); yieldClause_.reset(clause); } - FetchVerticesSentence(std::string *tag, + FetchVerticesSentence(FetchLabels *tags, Expression *ref, YieldClause *clause) { kind_ = Kind::kFetchVertices; - tag_.reset(tag); + tags_.reset(tags); vidRef_.reset(ref); yieldClause_.reset(clause); } - explicit FetchVerticesSentence(Expression *vid) { - kind_ = Kind::kFetchVertices; - tag_ = std::make_unique("*"); - vidList_ = std::make_unique(); - vidList_->add(vid); - } - - bool isAllTagProps() { - return *tag_ == "*"; - } - - auto tag() const { - return tag_.get(); + FetchLabels* tags() const { + return tags_.get(); } auto vidList() const { @@ -340,14 +329,10 @@ class FetchVerticesSentence final : public Sentence { return yieldClause_.get(); } - void setYieldClause(YieldClause *clause) { - yieldClause_.reset(clause); - } - std::string toString() const override; private: - std::unique_ptr tag_; + std::unique_ptr tags_; std::unique_ptr vidList_; std::unique_ptr vidRef_; std::unique_ptr yieldClause_; diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 116b33380f9..169d7f0825f 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -54,6 +54,7 @@ static constexpr size_t MAX_ABS_INTEGER = 9223372036854775808ULL; nebula::VertexIDList *vid_list; nebula::OverEdge *over_edge; nebula::OverEdges *over_edges; + nebula::FetchLabels *fetch_labels; nebula::OverClause *over_clause; nebula::WhereClause *where_clause; nebula::WhenClause *when_clause; @@ -159,6 +160,7 @@ static constexpr size_t MAX_ABS_INTEGER = 9223372036854775808ULL; %type vid_list %type over_edge %type over_edges +%type fetch_labels %type over_clause %type where_clause %type when_clause @@ -856,14 +858,32 @@ order_by_sentence ; fetch_vertices_sentence - : KW_FETCH KW_PROP KW_ON name_label vid_list yield_clause { + : KW_FETCH KW_PROP KW_ON fetch_labels vid_list yield_clause { $$ = new FetchVerticesSentence($4, $5, $6); } - | KW_FETCH KW_PROP KW_ON name_label vid_ref_expression yield_clause { + | KW_FETCH KW_PROP KW_ON fetch_labels vid_ref_expression yield_clause { $$ = new FetchVerticesSentence($4, $5, $6); } - | KW_FETCH KW_PROP KW_ON MUL vid { - $$ = new FetchVerticesSentence($5); + | KW_FETCH KW_PROP KW_ON MUL vid_list yield_clause { + auto labels = new nebula::FetchLabels(); + labels->addLabel(new std::string("*")); + $$ = new FetchVerticesSentence(labels, $5, $6); + } + | KW_FETCH KW_PROP KW_ON MUL vid_ref_expression yield_clause { + auto labels = new nebula::FetchLabels(); + labels->addLabel(new std::string("*")); + $$ = new FetchVerticesSentence(labels, $5, $6); + } + ; + +fetch_labels + : name_label { + $$ = new FetchLabels(); + $$->addLabel($1); + } + | fetch_labels COMMA name_label { + $$ = $1; + $$->addLabel($3); } ; @@ -907,12 +927,12 @@ edge_key_ref: ; fetch_edges_sentence - : KW_FETCH KW_PROP KW_ON name_label edge_keys yield_clause { - auto fetch = new FetchEdgesSentence($4, $5, $6); + : KW_FETCH KW_PROP KW_ON fetch_labels edge_keys yield_clause { + auto fetch = new FetchEdgesSentence($4->release(), $5, $6); $$ = fetch; } - | KW_FETCH KW_PROP KW_ON name_label edge_key_ref yield_clause { - auto fetch = new FetchEdgesSentence($4, $5, $6); + | KW_FETCH KW_PROP KW_ON fetch_labels edge_key_ref yield_clause { + auto fetch = new FetchEdgesSentence($4->release(), $5, $6); $$ = fetch; } ; diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index bb57ffdf84f..bc420135edc 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -950,6 +950,30 @@ TEST(Parser, FetchVertex) { auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + GQLParser parser; + std::string query = "FETCH PROP ON person 1, 2, 3 yield person.id"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "yield 1 as id | FETCH PROP ON person $-.id"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "yield 1 as id | FETCH PROP ON person $-.id yield person.id"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "FETCH PROP ON person, another 1, 2, 3"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } { GQLParser parser; std::string query = "FETCH PROP ON person hash(\"dutor\")"; @@ -1029,6 +1053,31 @@ TEST(Parser, FetchVertex) { auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + GQLParser parser; + std::string query = "FETCH PROP ON * 1, 2"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "FETCH PROP ON * $-.id"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "yield 1 as id | FETCH PROP ON * $-.id"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "yield 1 as id | FETCH PROP ON * $-.id yield friend.id, person.id"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + } TEST(Parser, FetchEdge) { @@ -1038,6 +1087,12 @@ TEST(Parser, FetchEdge) { auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + GQLParser parser; + std::string query = "FETCH PROP ON transfer, another 12345 -> -54321"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } { GQLParser parser; std::string query = "FETCH PROP ON transfer 12345 -> -54321 " @@ -1771,7 +1826,9 @@ TEST(Parser, GroupBy) { "F_STD($^.person.name ), " "F_BIT_AND($^.person.name ), " "F_BIT_OR($^.person.name ), " - "F_BIT_XOR($^.person.name )"; + "F_BIT_XOR($^.person.name ), " + "COLLECT_LIST($^.person.name ), " + "COLLECT_SET($^.person.name )"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); diff --git a/src/storage/test/AdHocSchemaManager.h b/src/storage/test/AdHocSchemaManager.h index e905a2bafe2..d5645a28a43 100644 --- a/src/storage/test/AdHocSchemaManager.h +++ b/src/storage/test/AdHocSchemaManager.h @@ -64,6 +64,11 @@ class AdHocSchemaManager final : public nebula::meta::SchemaManager { return Status::Error("Unimplemented"); } + StatusOr> getAllTag(GraphSpaceID) override { + LOG(FATAL) << "Unimplemented"; + return Status::Error("Unimplemented"); + } + void init(nebula::meta::MetaClient *) override { }