diff --git a/src/clients/storage/GraphStorageClient.cpp b/src/clients/storage/GraphStorageClient.cpp index 1c5cb30a589..206502e10e8 100644 --- a/src/clients/storage/GraphStorageClient.cpp +++ b/src/clients/storage/GraphStorageClient.cpp @@ -559,36 +559,68 @@ StorageRpcRespFuture GraphStorageClient::lookupAndTr }); } -folly::Future> GraphStorageClient::scanEdge( - cpp2::ScanEdgeRequest req, folly::EventBase* evb) { - std::pair request; - auto host = this->getLeader(req.get_space_id(), req.get_part_id()); - if (!host.ok()) { - return folly::makeFuture>(host.status()); +StorageRpcRespFuture GraphStorageClient::scanEdge( + const CommonRequestParam& param, + const cpp2::EdgeProp& edgeProp, + int64_t limit, + const Expression* filter) { + std::unordered_map requests; + auto status = getHostPartsWithCursor(param.space); + if (!status.ok()) { + return folly::makeFuture>( + std::runtime_error(status.status().toString())); + } + auto& clusters = status.value(); + for (const auto& c : clusters) { + auto& host = c.first; + auto& req = requests[host]; + req.set_space_id(param.space); + req.set_parts(std::move(c.second)); + req.set_return_columns(edgeProp); + req.set_limit(limit); + if (filter != nullptr) { + req.set_filter(filter->encode()); + } + req.set_common(param.toReqCommon()); } - request.first = std::move(host).value(); - request.second = std::move(req); - return getResponse(evb, - std::move(request), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); }); + return collectResponse(param.evb, + std::move(requests), + [](cpp2::GraphStorageServiceAsyncClient* client, + const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); }); } -folly::Future> GraphStorageClient::scanVertex( - cpp2::ScanVertexRequest req, folly::EventBase* evb) { - std::pair request; - auto host = this->getLeader(req.get_space_id(), req.get_part_id()); - if (!host.ok()) { - return folly::makeFuture>(host.status()); +StorageRpcRespFuture GraphStorageClient::scanVertex( + const CommonRequestParam& param, + const std::vector& vertexProp, + int64_t limit, + const Expression* filter) { + std::unordered_map requests; + auto status = getHostPartsWithCursor(param.space); + if (!status.ok()) { + return folly::makeFuture>( + std::runtime_error(status.status().toString())); + } + auto& clusters = status.value(); + for (const auto& c : clusters) { + auto& host = c.first; + auto& req = requests[host]; + req.set_space_id(param.space); + req.set_parts(std::move(c.second)); + req.set_return_columns(vertexProp); + req.set_limit(limit); + if (filter != nullptr) { + req.set_filter(filter->encode()); + } + req.set_common(param.toReqCommon()); } - request.first = std::move(host).value(); - request.second = std::move(req); - return getResponse(evb, - std::move(request), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::ScanVertexRequest& r) { return client->future_scanVertex(r); }); + return collectResponse( + param.evb, + std::move(requests), + [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::ScanVertexRequest& r) { + return client->future_scanVertex(r); + }); } StatusOr> GraphStorageClient::getIdFromRow( diff --git a/src/clients/storage/GraphStorageClient.h b/src/clients/storage/GraphStorageClient.h index 9ce6258e899..9b917b36add 100644 --- a/src/clients/storage/GraphStorageClient.h +++ b/src/clients/storage/GraphStorageClient.h @@ -130,11 +130,16 @@ class GraphStorageClient : public StorageClientBase lookupAndTraverse( const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec); - folly::Future> scanEdge(cpp2::ScanEdgeRequest req, - folly::EventBase* evb = nullptr); + StorageRpcRespFuture scanEdge(const CommonRequestParam& param, + const cpp2::EdgeProp& vertexProp, + int64_t limit, + const Expression* filter); - folly::Future> scanVertex(cpp2::ScanVertexRequest req, - folly::EventBase* evb = nullptr); + StorageRpcRespFuture scanVertex( + const CommonRequestParam& param, + const std::vector& vertexProp, + int64_t limit, + const Expression* filter); private: StatusOr> getIdFromRow(GraphSpaceID space, diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index ddb69a53232..6a3f7ac9651 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -329,5 +329,28 @@ StorageClientBase::getHostParts(GraphSpaceID spaceId) const { return hostParts; } +template +StatusOr>> +StorageClientBase::getHostPartsWithCursor(GraphSpaceID spaceId) const { + std::unordered_map> hostParts; + auto status = metaClient_->partsNum(spaceId); + if (!status.ok()) { + return Status::Error("Space not found, spaceid: %d", spaceId); + } + + // TODO support cursor + cpp2::ScanCursor c; + c.set_has_next(false); + auto parts = status.value(); + for (auto partId = 1; partId <= parts; partId++) { + auto leader = getLeader(spaceId, partId); + if (!leader.ok()) { + return leader.status(); + } + hostParts[leader.value()].emplace(partId, c); + } + return hostParts; +} + } // namespace storage } // namespace nebula diff --git a/src/clients/storage/StorageClientBase.h b/src/clients/storage/StorageClientBase.h index 900e4001e22..a9d347d10ed 100644 --- a/src/clients/storage/StorageClientBase.h +++ b/src/clients/storage/StorageClientBase.h @@ -166,6 +166,9 @@ class StorageClientBase { std::unordered_map>>> clusterIdsToHosts(GraphSpaceID spaceId, const Container& ids, GetIdFunc f) const; + StatusOr>> + getHostPartsWithCursor(GraphSpaceID spaceId) const; + virtual StatusOr getPartHosts(GraphSpaceID spaceId, PartitionID partId) const { CHECK(metaClient_ != nullptr); return metaClient_->getPartHostsFromCache(spaceId, partId); @@ -208,14 +211,6 @@ class StorageClientBase { return {req.get_part_id()}; } - std::vector getReqPartsId(const cpp2::ScanEdgeRequest& req) const { - return {req.get_part_id()}; - } - - std::vector getReqPartsId(const cpp2::ScanVertexRequest& req) const { - return {req.get_part_id()}; - } - bool isValidHostPtr(const HostAddr* addr) { return addr != nullptr && !addr->host.empty() && addr->port != 0; } diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index be82a0ca2e3..e4ff187305c 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -560,24 +560,29 @@ struct LookupAndTraverseRequest { * End of Index section */ +struct ScanCursor { + 3: bool has_next, + // next start key of scan, only valid when has_next is true + 4: optional binary next_cursor, +} + struct ScanVertexRequest { 1: common.GraphSpaceID space_id, - 2: common.PartitionID part_id, - // start key of this block - 3: optional binary cursor, - 4: VertexProp return_columns, + 2: map (cpp.template = "std::unordered_map") + parts, + 3: list return_columns, // max row count of tag in this response - 5: i64 limit, + 4: i64 limit, // only return data in time range [start_time, end_time) - 6: optional i64 start_time, - 7: optional i64 end_time, - 8: optional binary filter, + 5: optional i64 start_time, + 6: optional i64 end_time, + 7: optional binary filter, // when storage enable multi versions and only_latest_version is true, only return latest version. // when storage disable multi versions, just use the default value. - 9: bool only_latest_version = false, + 8: bool only_latest_version = false, // if set to false, forbid follower read - 10: bool enable_read_from_follower = true, - 11: optional RequestCommon common, + 9: bool enable_read_from_follower = true, + 10: optional RequestCommon common, } struct ScanVertexResponse { @@ -586,29 +591,27 @@ struct ScanVertexResponse { // Each column represents one property. the column name is in the form of "tag_name.prop_alias" // in the same order which specified in VertexProp in request. 2: common.DataSet vertex_data, - 3: bool has_next, - // next start key of scan, only valid when has_next is true - 4: optional binary next_cursor, + 3: map (cpp.template = "std::unordered_map") + cursors; } struct ScanEdgeRequest { 1: common.GraphSpaceID space_id, - 2: common.PartitionID part_id, - // start key of this block - 3: optional binary cursor, - 4: EdgeProp return_columns, + 2: map (cpp.template = "std::unordered_map") + parts, + 3: EdgeProp return_columns, // max row count of edge in this response - 5: i64 limit, + 4: i64 limit, // only return data in time range [start_time, end_time) - 6: optional i64 start_time, - 7: optional i64 end_time, - 8: optional binary filter, + 5: optional i64 start_time, + 6: optional i64 end_time, + 7: optional binary filter, // when storage enable multi versions and only_latest_version is true, only return latest version. // when storage disable multi versions, just use the default value. - 9: bool only_latest_version = false, + 8: bool only_latest_version = false, // if set to false, forbid follower read - 10: bool enable_read_from_follower = true, - 11: optional RequestCommon common, + 9: bool enable_read_from_follower = true, + 10: optional RequestCommon common, } struct ScanEdgeResponse { @@ -617,9 +620,8 @@ struct ScanEdgeResponse { // Each column represents one property. the column name is in the form of "edge_name.prop_alias" // in the same order which specified in EdgeProp in requesss. 2: common.DataSet edge_data, - 3: bool has_next, - // next start key of scan, only valid when has_next is true - 4: optional binary next_cursor, + 3: map (cpp.template = "std::unordered_map") + cursors; } struct TaskPara { diff --git a/src/storage/exec/EdgeNode.h b/src/storage/exec/EdgeNode.h index 988923f5f44..beee24665f3 100644 --- a/src/storage/exec/EdgeNode.h +++ b/src/storage/exec/EdgeNode.h @@ -26,7 +26,9 @@ class EdgeNode : public IterateNode { return valueHandler(this->key(), this->reader(), props_); } - const std::string& getEdgeName() { return edgeName_; } + const std::string& getEdgeName() const { return edgeName_; } + + EdgeType edgeType() const { return edgeType_; } protected: EdgeNode(RuntimeContext* context, @@ -113,8 +115,7 @@ class FetchEdgeNode final : public EdgeNode { (*edgeKey.dst_ref()).getStr()); ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &val_); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { - resetReader(); - return nebula::cpp2::ErrorCode::SUCCEEDED; + return doExecute(key_, val_); } else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { // regard key not found as succeed as well, upper node will handle it return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -122,6 +123,20 @@ class FetchEdgeNode final : public EdgeNode { return ret; } + nebula::cpp2::ErrorCode doExecute(const std::string& key, const std::string& value) { + key_ = key; + val_ = value; + resetReader(); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + void clear() { + valid_ = false; + key_.clear(); + val_.clear(); + reader_.reset(); + } + private: void resetReader() { reader_.reset(*schemas_, val_); diff --git a/src/storage/exec/RelNode.h b/src/storage/exec/RelNode.h index 1588dbff831..8bc0f2ff39f 100644 --- a/src/storage/exec/RelNode.h +++ b/src/storage/exec/RelNode.h @@ -77,6 +77,8 @@ class RelNode { explicit RelNode(const std::string& name) : name_(name) {} + const std::string& name() const { return name_; } + std::string name_ = "RelNode"; std::vector*> dependencies_; bool hasDependents_ = false; diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h new file mode 100644 index 00000000000..3778eb87804 --- /dev/null +++ b/src/storage/exec/ScanNode.h @@ -0,0 +1,274 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#pragma once + +#include "common/base/Base.h" +#include "storage/exec/GetPropNode.h" + +namespace nebula { +namespace storage { + +using Cursor = std::string; + +// Node to scan vertices of one partition +class ScanVertexPropNode : public QueryNode { + public: + using RelNode::doExecute; + + explicit ScanVertexPropNode(RuntimeContext* context, + std::vector> tagNodes, + bool enableReadFollower, + int64_t limit, + std::unordered_map* cursors, + nebula::DataSet* resultDataSet) + : context_(context), + tagNodes_(std::move(tagNodes)), + enableReadFollower_(enableReadFollower), + limit_(limit), + cursors_(cursors), + resultDataSet_(resultDataSet) { + name_ = "ScanVertexPropNode"; + for (std::size_t i = 0; i < tagNodes_.size(); ++i) { + tagNodesIndex_.emplace(tagNodes_[i]->tagId(), i); + } + } + + nebula::cpp2::ErrorCode doExecute(PartitionID partId, const Cursor& cursor) override { + auto ret = RelNode::doExecute(partId); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } + + std::string start; + std::string prefix = NebulaKeyUtils::vertexPrefix(partId); + if (cursor.empty()) { + start = prefix; + } else { + start = cursor; + } + + std::unique_ptr iter; + auto kvRet = context_->env()->kvstore_->rangeWithPrefix( + context_->planContext_->spaceId_, partId, start, prefix, &iter, enableReadFollower_); + if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + return kvRet; + } + + const auto rowLimit = limit_; + auto vIdLen = context_->vIdLen(); + auto isIntId = context_->isIntId(); + std::string currentVertexId; + for (; iter->valid() && static_cast(resultDataSet_->rowSize()) < rowLimit; + iter->next()) { + auto key = iter->key(); + auto tagId = NebulaKeyUtils::getTagId(vIdLen, key); + auto tagIdIndex = tagNodesIndex_.find(tagId); + if (tagIdIndex == tagNodesIndex_.end()) { + continue; + } + auto vertexId = NebulaKeyUtils::getVertexId(vIdLen, key); + if (vertexId != currentVertexId && !currentVertexId.empty()) { + collectOneRow(isIntId, vIdLen, currentVertexId); + } // collect vertex row + currentVertexId = vertexId; + if (static_cast(resultDataSet_->rowSize()) >= rowLimit) { + break; + } + auto value = iter->val(); + tagNodes_[tagIdIndex->second]->doExecute(key.toString(), value.toString()); + } // iterate key + if (static_cast(resultDataSet_->rowSize()) < rowLimit) { + collectOneRow(isIntId, vIdLen, currentVertexId); + } + + cpp2::ScanCursor c; + if (iter->valid()) { + c.set_has_next(true); + c.set_next_cursor(iter->key().str()); + } else { + c.set_has_next(false); + } + cursors_->emplace(partId, std::move(c)); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + void collectOneRow(bool isIntId, std::size_t vIdLen, const std::string& currentVertexId) { + List row; + nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; + // vertexId is the first column + if (isIntId) { + row.emplace_back(*reinterpret_cast(currentVertexId.data())); + } else { + row.emplace_back(currentVertexId.c_str()); + } + // if none of the tag node valid, do not emplace the row + if (std::any_of(tagNodes_.begin(), tagNodes_.end(), [](const auto& tagNode) { + return tagNode->valid(); + })) { + for (auto& tagNode : tagNodes_) { + ret = tagNode->collectTagPropsIfValid( + [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + for (const auto& prop : *props) { + if (prop.returned_) { + row.emplace_back(Value()); + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }, + [&row, vIdLen, isIntId]( + folly::StringPiece key, + RowReader* reader, + const std::vector* props) -> nebula::cpp2::ErrorCode { + if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + break; + } + } + if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { + resultDataSet_->rows.emplace_back(std::move(row)); + } + for (auto& tagNode : tagNodes_) { + tagNode->clear(); + } + } + } + + private: + RuntimeContext* context_; + std::vector> tagNodes_; + std::unordered_map tagNodesIndex_; + bool enableReadFollower_; + int64_t limit_; + // cursors for next scan + std::unordered_map* cursors_; + nebula::DataSet* resultDataSet_; +}; + +// Node to scan edge of one partition +class ScanEdgePropNode : public QueryNode { + public: + using RelNode::doExecute; + + ScanEdgePropNode(RuntimeContext* context, + std::vector> edgeNodes, + bool enableReadFollower, + int64_t limit, + std::unordered_map* cursors, + nebula::DataSet* resultDataSet) + : context_(context), + edgeNodes_(std::move(edgeNodes)), + enableReadFollower_(enableReadFollower), + limit_(limit), + cursors_(cursors), + resultDataSet_(resultDataSet) { + QueryNode::name_ = "ScanEdgePropNode"; + for (std::size_t i = 0; i < edgeNodes_.size(); ++i) { + edgeNodesIndex_.emplace(edgeNodes_[i]->edgeType(), i); + } + } + + nebula::cpp2::ErrorCode doExecute(PartitionID partId, const Cursor& cursor) override { + auto ret = RelNode::doExecute(partId); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } + + std::string start; + std::string prefix = NebulaKeyUtils::edgePrefix(partId); + if (cursor.empty()) { + start = prefix; + } else { + start = cursor; + } + + std::unique_ptr iter; + auto kvRet = context_->env()->kvstore_->rangeWithPrefix( + context_->spaceId(), partId, start, prefix, &iter, enableReadFollower_); + if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + return kvRet; + } + + auto rowLimit = limit_; + auto vIdLen = context_->vIdLen(); + auto isIntId = context_->isIntId(); + for (; iter->valid() && static_cast(resultDataSet_->rowSize()) < rowLimit; + iter->next()) { + auto key = iter->key(); + if (!NebulaKeyUtils::isEdge(vIdLen, key)) { + continue; + } + auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen, key); + auto edgeNodeIndex = edgeNodesIndex_.find(edgeType); + if (edgeNodeIndex == edgeNodesIndex_.end()) { + continue; + } + auto value = iter->val(); + edgeNodes_[edgeNodeIndex->second]->doExecute(key.toString(), value.toString()); + collectOneRow(isIntId, vIdLen); + } + + cpp2::ScanCursor c; + if (iter->valid()) { + c.set_has_next(true); + c.set_next_cursor(iter->key().str()); + } else { + c.set_has_next(false); + } + cursors_->emplace(partId, std::move(c)); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + + void collectOneRow(bool isIntId, std::size_t vIdLen) { + List row; + nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; + for (auto& edgeNode : edgeNodes_) { + ret = edgeNode->collectEdgePropsIfValid( + [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + for (const auto& prop : *props) { + if (prop.returned_) { + row.emplace_back(Value()); + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }, + [&row, vIdLen, isIntId]( + folly::StringPiece key, + RowReader* reader, + const std::vector* props) -> nebula::cpp2::ErrorCode { + if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) { + return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + } + return nebula::cpp2::ErrorCode::SUCCEEDED; + }); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + break; + } + } + if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { + resultDataSet_->rows.emplace_back(std::move(row)); + } + for (auto& edgeNode : edgeNodes_) { + edgeNode->clear(); + } + } + + private: + RuntimeContext* context_; + std::vector> edgeNodes_; + std::unordered_map edgeNodesIndex_; + bool enableReadFollower_; + int64_t limit_; + // cursors for next scan + std::unordered_map* cursors_; + nebula::DataSet* resultDataSet_; +}; + +} // namespace storage +} // namespace nebula diff --git a/src/storage/exec/TagNode.h b/src/storage/exec/TagNode.h index e203b494b6c..d6d597addc8 100644 --- a/src/storage/exec/TagNode.h +++ b/src/storage/exec/TagNode.h @@ -53,8 +53,7 @@ class TagNode final : public IterateNode { key_ = NebulaKeyUtils::vertexKey(context_->vIdLen(), partId, vId, tagId_); ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &value_); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { - resetReader(); - return nebula::cpp2::ErrorCode::SUCCEEDED; + return doExecute(key_, value_); } else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { // regard key not found as succeed as well, upper node will handle it return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -62,6 +61,13 @@ class TagNode final : public IterateNode { return ret; } + nebula::cpp2::ErrorCode doExecute(const std::string& key, const std::string& value) { + key_ = key; + value_ = value; + resetReader(); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + nebula::cpp2::ErrorCode collectTagPropsIfValid(NullHandler nullHandler, PropHandler valueHandler) { if (!valid()) { @@ -83,7 +89,16 @@ class TagNode final : public IterateNode { RowReader* reader() const override { return reader_.get(); } - const std::string& getTagName() { return tagName_; } + const std::string& getTagName() const { return tagName_; } + + TagID tagId() const { return tagId_; } + + void clear() { + valid_ = false; + key_.clear(); + value_.clear(); + reader_.reset(); + } private: void resetReader() { diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 61a4c2f9770..5da9b6425e6 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -24,81 +24,35 @@ void ScanEdgeProcessor::process(const cpp2::ScanEdgeRequest& req) { void ScanEdgeProcessor::doProcess(const cpp2::ScanEdgeRequest& req) { spaceId_ = req.get_space_id(); - partId_ = req.get_part_id(); + enableReadFollower_ = req.get_enable_read_from_follower(); + limit_ = req.get_limit(); auto retCode = getSpaceVidLen(spaceId_); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - pushResultCode(retCode, partId_); + for (auto& p : req.get_parts()) { + pushResultCode(retCode, p.first); + } onFinished(); return; } + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); + retCode = checkAndBuildContexts(req); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - pushResultCode(retCode, partId_); + for (auto& p : req.get_parts()) { + pushResultCode(retCode, p.first); + } onFinished(); return; } - std::string start; - std::string prefix = NebulaKeyUtils::edgePrefix(partId_); - if (req.get_cursor() == nullptr || req.get_cursor()->empty()) { - start = prefix; + if (!FLAGS_query_concurrently) { + runInSingleThread(req); } else { - start = *req.get_cursor(); + runInMultipleThread(req); } - - std::unique_ptr iter; - auto kvRet = env_->kvstore_->rangeWithPrefix( - spaceId_, partId_, start, prefix, &iter, req.get_enable_read_from_follower()); - if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleErrorCode(kvRet, spaceId_, partId_); - onFinished(); - return; - } - - auto rowLimit = req.get_limit(); - RowReaderWrapper reader; - - for (int64_t rowCount = 0; iter->valid() && rowCount < rowLimit; iter->next()) { - auto key = iter->key(); - if (!NebulaKeyUtils::isEdge(spaceVidLen_, key)) { - continue; - } - - auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, key); - auto edgeIter = edgeContext_.indexMap_.find(edgeType); - if (edgeIter == edgeContext_.indexMap_.end()) { - continue; - } - - auto val = iter->val(); - auto schemaIter = edgeContext_.schemas_.find(std::abs(edgeType)); - CHECK(schemaIter != edgeContext_.schemas_.end()); - reader.reset(schemaIter->second, val); - if (!reader) { - continue; - } - - nebula::List list; - auto idx = edgeIter->second; - auto props = &(edgeContext_.propContexts_[idx].second); - if (!QueryUtils::collectEdgeProps(key, spaceVidLen_, isIntId_, reader.get(), props, list) - .ok()) { - continue; - } - resultDataSet_.rows.emplace_back(std::move(list)); - rowCount++; - } - - if (iter->valid()) { - resp_.set_has_next(true); - resp_.set_next_cursor(iter->key().str()); - } else { - resp_.set_has_next(false); - } - onProcessFinished(); - onFinished(); } nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::ScanEdgeRequest& req) { @@ -123,7 +77,100 @@ void ScanEdgeProcessor::buildEdgeColName(const std::vector& edge } } -void ScanEdgeProcessor::onProcessFinished() { resp_.set_edge_data(std::move(resultDataSet_)); } +void ScanEdgeProcessor::onProcessFinished() { + resp_.set_edge_data(std::move(resultDataSet_)); + resp_.set_cursors(std::move(cursors_)); +} + +StoragePlan ScanEdgeProcessor::buildPlan( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors) { + StoragePlan plan; + std::vector> edges; + for (const auto& ec : edgeContext_.propContexts_) { + edges.emplace_back( + std::make_unique(context, &edgeContext_, ec.first, &ec.second)); + } + auto output = std::make_unique( + context, std::move(edges), enableReadFollower_, limit_, cursors, result); + + plan.addNode(std::move(output)); + return plan; +} + +folly::Future> ScanEdgeProcessor::runInExecutor( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors, + PartitionID partId, + Cursor cursor) { + return folly::via(executor_, + [this, context, result, cursors, partId, input = std::move(cursor)]() { + auto plan = buildPlan(context, result, cursors); + + auto ret = plan.go(partId, input); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return std::make_pair(ret, partId); + } + return std::make_pair(nebula::cpp2::ErrorCode::SUCCEEDED, partId); + }); +} + +void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) { + contexts_.emplace_back(RuntimeContext(planContext_.get())); + std::unordered_set failedParts; + auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_); + for (const auto& partEntry : req.get_parts()) { + auto partId = partEntry.first; + auto cursor = partEntry.second; + + auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : ""); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED && + failedParts.find(partId) == failedParts.end()) { + failedParts.emplace(partId); + handleErrorCode(ret, spaceId_, partId); + } + } + onProcessFinished(); + onFinished(); +} + +void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) { + cursorsOfPart_.resize(req.get_parts().size()); + for (size_t i = 0; i < req.get_parts().size(); i++) { + nebula::DataSet result = resultDataSet_; + results_.emplace_back(std::move(result)); + contexts_.emplace_back(RuntimeContext(planContext_.get())); + } + size_t i = 0; + std::vector>> futures; + for (const auto& [partId, cursor] : req.get_parts()) { + futures.emplace_back(runInExecutor(&contexts_[i], + &results_[i], + &cursorsOfPart_[i], + partId, + cursor.get_has_next() ? *cursor.get_next_cursor() : "")); + i++; + } + + folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + CHECK(!t.hasException()); + const auto& tries = t.value(); + for (size_t j = 0; j < tries.size(); j++) { + CHECK(!tries[j].hasException()); + const auto& [code, partId] = tries[j].value(); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleErrorCode(code, spaceId_, partId); + } else { + resultDataSet_.append(std::move(results_[j])); + cursors_.merge(std::move(cursorsOfPart_[j])); + } + } + this->onProcessFinished(); + this->onFinished(); + }); +} } // namespace storage } // namespace nebula diff --git a/src/storage/query/ScanEdgeProcessor.h b/src/storage/query/ScanEdgeProcessor.h index f5afef9934c..f1931cd881b 100644 --- a/src/storage/query/ScanEdgeProcessor.h +++ b/src/storage/query/ScanEdgeProcessor.h @@ -7,6 +7,8 @@ #define STORAGE_QUERY_SCANEDGEPROCESSOR_H_ #include "common/base/Base.h" +#include "storage/exec/ScanNode.h" +#include "storage/exec/StoragePlan.h" #include "storage/query/QueryBaseProcessor.h" namespace nebula { @@ -35,9 +37,30 @@ class ScanEdgeProcessor : public QueryBaseProcessor& edgeProps); + StoragePlan buildPlan(RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors); + + folly::Future> runInExecutor( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors, + PartitionID partId, + Cursor cursor); + + void runInSingleThread(const cpp2::ScanEdgeRequest& req); + + void runInMultipleThread(const cpp2::ScanEdgeRequest& req); + void onProcessFinished() override; - PartitionID partId_; + std::vector contexts_; + std::vector results_; + std::vector> cursorsOfPart_; + + std::unordered_map cursors_; + int64_t limit_{-1}; + bool enableReadFollower_{false}; }; } // namespace storage diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index ee9e3981f70..032ec103660 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -24,77 +24,35 @@ void ScanVertexProcessor::process(const cpp2::ScanVertexRequest& req) { void ScanVertexProcessor::doProcess(const cpp2::ScanVertexRequest& req) { spaceId_ = req.get_space_id(); - partId_ = req.get_part_id(); + limit_ = req.get_limit(); + enableReadFollower_ = req.get_enable_read_from_follower(); auto retCode = getSpaceVidLen(spaceId_); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - pushResultCode(retCode, partId_); + for (const auto& p : req.get_parts()) { + pushResultCode(retCode, p.first); + } onFinished(); return; } + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); + retCode = checkAndBuildContexts(req); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - pushResultCode(retCode, partId_); - onFinished(); - return; - } - - std::string start; - std::string prefix = NebulaKeyUtils::vertexPrefix(partId_); - if (req.get_cursor() == nullptr || req.get_cursor()->empty()) { - start = prefix; - } else { - start = *req.get_cursor(); - } - - std::unique_ptr iter; - auto kvRet = env_->kvstore_->rangeWithPrefix( - spaceId_, partId_, start, prefix, &iter, req.get_enable_read_from_follower()); - if (kvRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleErrorCode(kvRet, spaceId_, partId_); + for (const auto& p : req.get_parts()) { + pushResultCode(retCode, p.first); + } onFinished(); return; } - auto rowLimit = req.get_limit(); - RowReaderWrapper reader; - for (int64_t rowCount = 0; iter->valid() && rowCount < rowLimit; iter->next()) { - auto key = iter->key(); - - auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key); - auto tagIter = tagContext_.indexMap_.find(tagId); - if (tagIter == tagContext_.indexMap_.end()) { - continue; - } - - auto val = iter->val(); - auto schemaIter = tagContext_.schemas_.find(tagId); - CHECK(schemaIter != tagContext_.schemas_.end()); - reader.reset(schemaIter->second, val); - if (!reader) { - continue; - } - - nebula::List list; - auto idx = tagIter->second; - auto props = &(tagContext_.propContexts_[idx].second); - if (!QueryUtils::collectVertexProps(key, spaceVidLen_, isIntId_, reader.get(), props, list) - .ok()) { - continue; - } - resultDataSet_.rows.emplace_back(std::move(list)); - rowCount++; - } - - if (iter->valid()) { - resp_.set_has_next(true); - resp_.set_next_cursor(iter->key().str()); + if (!FLAGS_query_concurrently) { + runInSingleThread(req); } else { - resp_.set_has_next(false); + runInMultipleThread(req); } - onProcessFinished(); - onFinished(); } nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( @@ -104,13 +62,14 @@ nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( return ret; } - std::vector returnProps = {*req.return_columns_ref()}; + std::vector returnProps = *req.return_columns_ref(); ret = handleVertexProps(returnProps); buildTagColName(returnProps); return ret; } void ScanVertexProcessor::buildTagColName(const std::vector& tagProps) { + resultDataSet_.colNames.emplace_back(kVid); for (const auto& tagProp : tagProps) { auto tagId = tagProp.get_tag(); auto tagName = tagContext_.tagNames_[tagId]; @@ -120,7 +79,99 @@ void ScanVertexProcessor::buildTagColName(const std::vector& t } } -void ScanVertexProcessor::onProcessFinished() { resp_.set_vertex_data(std::move(resultDataSet_)); } +void ScanVertexProcessor::onProcessFinished() { + resp_.set_vertex_data(std::move(resultDataSet_)); + resp_.set_cursors(std::move(cursors_)); +} + +StoragePlan ScanVertexProcessor::buildPlan( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors) { + StoragePlan plan; + std::vector> tags; + for (const auto& tc : tagContext_.propContexts_) { + tags.emplace_back(std::make_unique(context, &tagContext_, tc.first, &tc.second)); + } + auto output = std::make_unique( + context, std::move(tags), enableReadFollower_, limit_, cursors, result); + + plan.addNode(std::move(output)); + return plan; +} + +folly::Future> ScanVertexProcessor::runInExecutor( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursorsOfPart, + PartitionID partId, + Cursor cursor) { + return folly::via(executor_, + [this, context, result, cursorsOfPart, partId, input = std::move(cursor)]() { + auto plan = buildPlan(context, result, cursorsOfPart); + + auto ret = plan.go(partId, input); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return std::make_pair(ret, partId); + } + return std::make_pair(nebula::cpp2::ErrorCode::SUCCEEDED, partId); + }); +} + +void ScanVertexProcessor::runInSingleThread(const cpp2::ScanVertexRequest& req) { + contexts_.emplace_back(RuntimeContext(planContext_.get())); + std::unordered_set failedParts; + auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_); + for (const auto& partEntry : req.get_parts()) { + auto partId = partEntry.first; + auto cursor = partEntry.second; + + auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : ""); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED && + failedParts.find(partId) == failedParts.end()) { + failedParts.emplace(partId); + handleErrorCode(ret, spaceId_, partId); + } + } + onProcessFinished(); + onFinished(); +} + +void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req) { + cursorsOfPart_.resize(req.get_parts().size()); + for (size_t i = 0; i < req.get_parts().size(); i++) { + nebula::DataSet result = resultDataSet_; + results_.emplace_back(std::move(result)); + contexts_.emplace_back(RuntimeContext(planContext_.get())); + } + size_t i = 0; + std::vector>> futures; + for (const auto& [partId, cursor] : req.get_parts()) { + futures.emplace_back(runInExecutor(&contexts_[i], + &results_[i], + &cursorsOfPart_[i], + partId, + cursor.get_has_next() ? *cursor.get_next_cursor() : "")); + i++; + } + + folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + CHECK(!t.hasException()); + const auto& tries = t.value(); + for (size_t j = 0; j < tries.size(); j++) { + CHECK(!tries[j].hasException()); + const auto& [code, partId] = tries[j].value(); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleErrorCode(code, spaceId_, partId); + } else { + resultDataSet_.append(std::move(results_[j])); + cursors_.merge(std::move(cursorsOfPart_[j])); + } + } + this->onProcessFinished(); + this->onFinished(); + }); +} } // namespace storage } // namespace nebula diff --git a/src/storage/query/ScanVertexProcessor.h b/src/storage/query/ScanVertexProcessor.h index d6c988adf43..987c77e2edd 100644 --- a/src/storage/query/ScanVertexProcessor.h +++ b/src/storage/query/ScanVertexProcessor.h @@ -7,6 +7,8 @@ #define STORAGE_QUERY_SCANVERTEXPROCESSOR_H_ #include "common/base/Base.h" +#include "storage/exec/ScanNode.h" +#include "storage/exec/StoragePlan.h" #include "storage/query/QueryBaseProcessor.h" namespace nebula { @@ -36,10 +38,31 @@ class ScanVertexProcessor void buildTagColName(const std::vector& tagProps); + StoragePlan buildPlan(RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors); + + folly::Future> runInExecutor( + RuntimeContext* context, + nebula::DataSet* result, + std::unordered_map* cursors, + PartitionID partId, + Cursor cursor); + + void runInSingleThread(const cpp2::ScanVertexRequest& req); + + void runInMultipleThread(const cpp2::ScanVertexRequest& req); + void onProcessFinished() override; private: - PartitionID partId_; + std::vector contexts_; + std::vector results_; + std::vector> cursorsOfPart_; + + std::unordered_map cursors_; + int64_t limit_{-1}; + bool enableReadFollower_{false}; }; } // namespace storage diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 7155534c496..3ed3d41a9dc 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -3,6 +3,7 @@ * This source code is licensed under Apache 2.0 License. */ +#include #include #include "common/base/Base.h" @@ -13,8 +14,8 @@ namespace nebula { namespace storage { -cpp2::ScanEdgeRequest buildRequest(PartitionID partId, - const std::string& cursor, +cpp2::ScanEdgeRequest buildRequest(std::vector partIds, + std::vector cursors, const std::pair>& edge, int64_t rowLimit = 100, int64_t startTime = 0, @@ -22,8 +23,15 @@ cpp2::ScanEdgeRequest buildRequest(PartitionID partId, bool onlyLatestVer = false) { cpp2::ScanEdgeRequest req; req.set_space_id(1); - req.set_part_id(partId); - req.set_cursor(cursor); + cpp2::ScanCursor c; + CHECK_EQ(partIds.size(), cursors.size()); + std::unordered_map parts; + for (std::size_t i = 0; i < partIds.size(); ++i) { + c.set_has_next(!cursors[i].empty()); + c.set_next_cursor(cursors[i]); + parts.emplace(partIds[i], c); + } + req.set_parts(std::move(parts)); EdgeType edgeType = edge.first; cpp2::EdgeProp edgeProp; edgeProp.set_type(edgeType); @@ -96,7 +104,7 @@ TEST(ScanEdgeTest, PropertyTest) { serve, std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest(partId, "", edge); + auto req = buildRequest({partId}, {""}, edge); auto* processor = ScanEdgeProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -112,7 +120,7 @@ TEST(ScanEdgeTest, PropertyTest) { size_t totalRowCount = 0; auto edge = std::make_pair(serve, std::vector{}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest(partId, "", edge); + auto req = buildRequest({partId}, {""}, edge); auto* processor = ScanEdgeProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -147,7 +155,7 @@ TEST(ScanEdgeTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest(partId, cursor, edge, 5); + auto req = buildRequest({partId}, {cursor}, edge, 5); auto* processor = ScanEdgeProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -155,10 +163,10 @@ TEST(ScanEdgeTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount); - hasNext = resp.get_has_next(); + hasNext = resp.get_cursors().at(partId).get_has_next(); if (hasNext) { - CHECK(resp.next_cursor_ref().has_value()); - cursor = *resp.next_cursor_ref(); + CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value()); + cursor = *resp.get_cursors().at(partId).next_cursor_ref(); } } } @@ -174,7 +182,7 @@ TEST(ScanEdgeTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest(partId, cursor, edge, 1); + auto req = buildRequest({partId}, {cursor}, edge, 1); auto* processor = ScanEdgeProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -182,10 +190,10 @@ TEST(ScanEdgeTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount); - hasNext = resp.get_has_next(); + hasNext = resp.get_cursors().at(partId).get_has_next(); if (hasNext) { - CHECK(resp.next_cursor_ref().has_value()); - cursor = *resp.next_cursor_ref(); + CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value()); + cursor = *resp.get_cursors().at(partId).next_cursor_ref(); } } } @@ -193,6 +201,94 @@ TEST(ScanEdgeTest, CursorTest) { } } +TEST(ScanEdgeTest, MultiplePartsTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + EdgeType serve = 101; + + { + LOG(INFO) << "Scan one edge with some properties in one batch"; + size_t totalRowCount = 0; + auto edge = std::make_pair( + serve, + std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); + auto req = buildRequest({1, 3}, {"", ""}, edge); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount); + } + { + LOG(INFO) << "Scan one edge with all properties in one batch"; + size_t totalRowCount = 0; + auto edge = std::make_pair(serve, std::vector{}); + auto req = buildRequest({1, 3}, {"", ""}, edge); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + // all 9 columns in value + checkResponse(*resp.edge_data_ref(), edge, 9, totalRowCount); + } +} + +TEST(ScanEdgeTest, LimitTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + EdgeType serve = 101; + + { + LOG(INFO) << "Scan one edge with some properties in one batch"; + constexpr std::size_t limit = 3; + size_t totalRowCount = 0; + auto edge = std::make_pair( + serve, + std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); + auto req = buildRequest({1}, {""}, edge, limit); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.edge_data_ref(), edge, edge.second.size(), totalRowCount); + EXPECT_EQ(totalRowCount, limit); + } + { + LOG(INFO) << "Scan one edge with all properties in one batch"; + constexpr std::size_t limit = 3; + size_t totalRowCount = 0; + auto edge = std::make_pair(serve, std::vector{}); + auto req = buildRequest({1}, {""}, edge, limit); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + // all 9 columns in value + checkResponse(*resp.edge_data_ref(), edge, 9, totalRowCount); + EXPECT_EQ(totalRowCount, limit); + } +} + } // namespace storage } // namespace nebula diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 6e2cb6f8ae4..f582848ad92 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -3,6 +3,7 @@ * This source code is licensed under Apache 2.0 License. */ +#include #include #include "common/base/Base.h" @@ -13,24 +14,36 @@ namespace nebula { namespace storage { -cpp2::ScanVertexRequest buildRequest(PartitionID partId, - const std::string& cursor, - const std::pair>& tag, - int64_t rowLimit = 100, - int64_t startTime = 0, - int64_t endTime = std::numeric_limits::max(), - bool onlyLatestVer = false) { +cpp2::ScanVertexRequest buildRequest( + std::vector partIds, + std::vector cursors, + const std::vector>>& tags, + int64_t rowLimit = 100, + int64_t startTime = 0, + int64_t endTime = std::numeric_limits::max(), + bool onlyLatestVer = false) { cpp2::ScanVertexRequest req; req.set_space_id(1); - req.set_part_id(partId); - req.set_cursor(cursor); - TagID tagId = tag.first; - cpp2::VertexProp vertexProp; - vertexProp.set_tag(tagId); - for (const auto& prop : tag.second) { - (*vertexProp.props_ref()).emplace_back(std::move(prop)); + cpp2::ScanCursor c; + CHECK_EQ(partIds.size(), cursors.size()); + std::unordered_map parts; + for (std::size_t i = 0; i < partIds.size(); ++i) { + c.set_has_next(!cursors[i].empty()); + c.set_next_cursor(cursors[i]); + parts.emplace(partIds[i], c); } - req.set_return_columns(std::move(vertexProp)); + req.set_parts(std::move(parts)); + std::vector vertexProps; + for (const auto& tag : tags) { + TagID tagId = tag.first; + cpp2::VertexProp vertexProp; + vertexProp.set_tag(tagId); + for (const auto& prop : tag.second) { + (*vertexProp.props_ref()).emplace_back(std::move(prop)); + } + vertexProps.emplace_back(std::move(vertexProp)); + } + req.set_return_columns(std::move(vertexProps)); req.set_limit(rowLimit); req.set_start_time(startTime); req.set_end_time(endTime); @@ -44,9 +57,10 @@ void checkResponse(const nebula::DataSet& dataSet, size_t& totalRowCount) { ASSERT_EQ(dataSet.colNames.size(), expectColumnCount); if (!tag.second.empty()) { - ASSERT_EQ(dataSet.colNames.size(), tag.second.size()); - for (size_t i = 0; i < dataSet.colNames.size(); i++) { - ASSERT_EQ(dataSet.colNames[i], std::to_string(tag.first) + "." + tag.second[i]); + ASSERT_EQ(dataSet.colNames.size(), tag.second.size() + 1 /* kVid*/); + for (size_t i = 0; i < dataSet.colNames.size() - 1 /* kVid */; i++) { + ASSERT_EQ(dataSet.colNames[i + 1 /* kVid */], + std::to_string(tag.first) + "." + tag.second[i]); } } totalRowCount += dataSet.rows.size(); @@ -63,13 +77,17 @@ void checkResponse(const nebula::DataSet& dataSet, mock::MockData::players_.end(), [&](const auto& player) { return player.name_ == vId; }); CHECK(iter != mock::MockData::players_.end()); - QueryTestUtils::checkPlayer(props, *iter, row.values); + std::vector returnProps({kVid}); + returnProps.insert(returnProps.end(), props.begin(), props.end()); + QueryTestUtils::checkPlayer(returnProps, *iter, row.values); break; } case 2: { // tag team auto iter = std::find(mock::MockData::teams_.begin(), mock::MockData::teams_.end(), vId); - QueryTestUtils::checkTeam(props, *iter, row.values); + std::vector returnProps({kVid}); + returnProps.insert(returnProps.end(), props.begin(), props.end()); + QueryTestUtils::checkTeam(returnProps, *iter, row.values); break; } default: @@ -95,14 +113,14 @@ TEST(ScanVertexTest, PropertyTest) { auto tag = std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest(partId, "", tag); + auto req = buildRequest({partId}, {""}, {tag}); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); ASSERT_EQ(0, resp.result.failed_parts.size()); - checkResponse(*resp.vertex_data_ref(), tag, tag.second.size(), totalRowCount); + checkResponse(*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); } CHECK_EQ(mock::MockData::players_.size(), totalRowCount); } @@ -110,8 +128,20 @@ TEST(ScanVertexTest, PropertyTest) { LOG(INFO) << "Scan one tag with all properties in one batch"; size_t totalRowCount = 0; auto tag = std::make_pair(player, std::vector{}); + auto respTag = std::make_pair(player, + std::vector{"name", + "age", + "playing", + "career", + "startYear", + "endYear", + "games", + "avgScore", + "serveTeams", + "country", + "champions"}); for (PartitionID partId = 1; partId <= totalParts; partId++) { - auto req = buildRequest(partId, "", tag); + auto req = buildRequest({partId}, {""}, {tag}); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); @@ -119,7 +149,7 @@ TEST(ScanVertexTest, PropertyTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); // all 11 columns in value - checkResponse(*resp.vertex_data_ref(), tag, 11, totalRowCount); + checkResponse(*resp.vertex_data_ref(), respTag, 11 + 1 /* kVid */, totalRowCount); } CHECK_EQ(mock::MockData::players_.size(), totalRowCount); } @@ -145,18 +175,19 @@ TEST(ScanVertexTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest(partId, cursor, tag, 5); + auto req = buildRequest({partId}, {cursor}, {tag}, 5); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); ASSERT_EQ(0, resp.result.failed_parts.size()); - checkResponse(*resp.vertex_data_ref(), tag, tag.second.size(), totalRowCount); - hasNext = resp.get_has_next(); + checkResponse( + *resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + hasNext = resp.get_cursors().at(partId).get_has_next(); if (hasNext) { - CHECK(resp.next_cursor_ref()); - cursor = *resp.next_cursor_ref(); + CHECK(resp.get_cursors().at(partId).next_cursor_ref()); + cursor = *resp.get_cursors().at(partId).next_cursor_ref(); } } } @@ -171,18 +202,19 @@ TEST(ScanVertexTest, CursorTest) { bool hasNext = true; std::string cursor = ""; while (hasNext) { - auto req = buildRequest(partId, cursor, tag, 1); + auto req = buildRequest({partId}, {cursor}, {tag}, 1); auto* processor = ScanVertexProcessor::instance(env, nullptr); auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); ASSERT_EQ(0, resp.result.failed_parts.size()); - checkResponse(*resp.vertex_data_ref(), tag, tag.second.size(), totalRowCount); - hasNext = resp.get_has_next(); + checkResponse( + *resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + hasNext = resp.get_cursors().at(partId).get_has_next(); if (hasNext) { - CHECK(resp.next_cursor_ref()); - cursor = *resp.next_cursor_ref(); + CHECK(resp.get_cursors().at(partId).next_cursor_ref()); + cursor = *resp.get_cursors().at(partId).next_cursor_ref(); } } } @@ -190,6 +222,221 @@ TEST(ScanVertexTest, CursorTest) { } } +TEST(ScanVertexTest, MultiplePartsTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + size_t totalRowCount = 0; + auto tag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + auto req = buildRequest({1, 3}, {"", ""}, {tag}); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + } + { + LOG(INFO) << "Scan one tag with all properties in one batch"; + size_t totalRowCount = 0; + auto tag = std::make_pair(player, std::vector{}); + auto respTag = std::make_pair(player, + std::vector{"name", + "age", + "playing", + "career", + "startYear", + "endYear", + "games", + "avgScore", + "serveTeams", + "country", + "champions"}); + auto req = buildRequest({1, 3}, {"", ""}, {tag}); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + // all 11 columns in value + checkResponse(*resp.vertex_data_ref(), respTag, 11 + 1 /* kVid */, totalRowCount); + } +} + +TEST(ScanVertexTest, LimitTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + constexpr std::size_t limit = 3; + size_t totalRowCount = 0; + auto tag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + auto req = buildRequest({2}, {""}, {tag}, limit); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + checkResponse(*resp.vertex_data_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); + EXPECT_EQ(totalRowCount, limit); + } + { + LOG(INFO) << "Scan one tag with all properties in one batch"; + constexpr std::size_t limit = 3; + size_t totalRowCount = 0; + auto tag = std::make_pair(player, std::vector{}); + auto respTag = std::make_pair(player, + std::vector{"name", + "age", + "playing", + "career", + "startYear", + "endYear", + "games", + "avgScore", + "serveTeams", + "country", + "champions"}); + auto req = buildRequest({2}, {""}, {tag}, limit); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + // all 11 columns in value + checkResponse(*resp.vertex_data_ref(), respTag, 11 + 1 /* kVid */, totalRowCount); + EXPECT_EQ(totalRowCount, limit); + } +} + +TEST(ScanVertexTest, MultipleTagsTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + TagID team = 2; + + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + // size_t totalRowCount = 0; + auto playerTag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + auto teamTag = std::make_pair(team, std::vector{kTag, "name"}); + auto req = buildRequest({1}, {""}, {playerTag, teamTag}); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + nebula::DataSet expect( + {"_vid", "1._vid", "1._tag", "1.name", "1.age", "1.avgScore", "2._tag", "2.name"}); + expect.emplace_back(List({"Bulls", + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + 2, + "Bulls"})); + expect.emplace_back(List({"Cavaliers", + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + 2, + "Cavaliers"})); + expect.emplace_back(List({"Damian Lillard", + "Damian Lillard", + 1, + "Damian Lillard", + 29, + 24, + Value::kEmpty, + Value::kEmpty})); + expect.emplace_back(List( + {"Jason Kidd", "Jason Kidd", 1, "Jason Kidd", 47, 12.6, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List( + {"Kevin Durant", "Kevin Durant", 1, "Kevin Durant", 31, 27, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List( + {"Kobe Bryant", "Kobe Bryant", 1, "Kobe Bryant", 41, 25, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List({"Kristaps Porzingis", + "Kristaps Porzingis", + 1, + "Kristaps Porzingis", + 24, + 18.1, + Value::kEmpty, + Value::kEmpty})); + expect.emplace_back(List( + {"Luka Doncic", "Luka Doncic", 1, "Luka Doncic", 21, 24.4, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List({"Mavericks", + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + 2, + "Mavericks"})); + expect.emplace_back(List({"Nuggets", + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + Value::kEmpty, + 2, + "Nuggets"})); + expect.emplace_back(List( + {"Paul George", "Paul George", 1, "Paul George", 30, 19.9, Value::kEmpty, Value::kEmpty})); + expect.emplace_back(List({"Tracy McGrady", + "Tracy McGrady", + 1, + "Tracy McGrady", + 41, + 19.6, + Value::kEmpty, + Value::kEmpty})); + expect.emplace_back(List({"Vince Carter", + "Vince Carter", + 1, + "Vince Carter", + 43, + 16.7, + Value::kEmpty, + Value::kEmpty})); + EXPECT_EQ(expect, *resp.vertex_data_ref()); + } +} + } // namespace storage } // namespace nebula