From 86bb15a492a2f7f30c532b1bc5dd64278c16d762 Mon Sep 17 00:00:00 2001 From: jimingquan Date: Thu, 22 Sep 2022 14:37:39 +0800 Subject: [PATCH 1/5] add vertex filter for GetNeighbors --- src/clients/storage/StorageClient.cpp | 6 ++++- src/clients/storage/StorageClient.h | 3 ++- src/graph/executor/algo/BatchShortestPath.cpp | 1 + .../executor/algo/SingleShortestPath.cpp | 1 + .../executor/query/GetNeighborsExecutor.cpp | 3 ++- src/graph/executor/query/TraverseExecutor.cpp | 3 ++- src/interface/storage.thrift | 2 ++ src/storage/CommonUtils.h | 1 + src/storage/exec/FilterNode.h | 12 ++++++++++ src/storage/exec/GetNeighborsNode.h | 4 ++++ src/storage/query/GetNeighborsProcessor.cpp | 22 ++++++++++++------- src/storage/query/GetPropProcessor.cpp | 3 ++- src/storage/query/QueryBaseProcessor-inl.h | 11 ++++++++-- src/storage/query/QueryBaseProcessor.h | 3 ++- src/storage/query/ScanEdgeProcessor.cpp | 3 ++- src/storage/query/ScanVertexProcessor.cpp | 3 ++- 16 files changed, 63 insertions(+), 18 deletions(-) diff --git a/src/clients/storage/StorageClient.cpp b/src/clients/storage/StorageClient.cpp index e40385b30ba..57069e6c370 100644 --- a/src/clients/storage/StorageClient.cpp +++ b/src/clients/storage/StorageClient.cpp @@ -51,7 +51,8 @@ StorageRpcRespFuture StorageClient::getNeighbors( bool random, const std::vector& orderBy, int64_t limit, - const Expression* filter) { + const Expression* filter, + const Expression* vertexFilter) { auto cbStatus = getIdFromValue(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( @@ -97,6 +98,9 @@ StorageRpcRespFuture StorageClient::getNeighbors( if (filter != nullptr) { spec.filter_ref() = filter->encode(); } + if (vertexFilter != nullptr) { + spec.vertex_filter_ref() = vertexFilter->encode(); + } req.traverse_spec_ref() = std::move(spec); } diff --git a/src/clients/storage/StorageClient.h b/src/clients/storage/StorageClient.h index f6e58a05b70..442109c7e8b 100644 --- a/src/clients/storage/StorageClient.h +++ b/src/clients/storage/StorageClient.h @@ -78,7 +78,8 @@ class StorageClient bool random = false, const std::vector& orderBy = std::vector(), int64_t limit = std::numeric_limits::max(), - const Expression* filter = nullptr); + const Expression* filter = nullptr, + const Expression* vertexFilter = nullptr); StorageRpcRespFuture getDstBySrc( const CommonRequestParam& param, diff --git a/src/graph/executor/algo/BatchShortestPath.cpp b/src/graph/executor/algo/BatchShortestPath.cpp index 7ad1cfa6146..2b0ab6fc8cc 100644 --- a/src/graph/executor/algo/BatchShortestPath.cpp +++ b/src/graph/executor/algo/BatchShortestPath.cpp @@ -133,6 +133,7 @@ folly::Future BatchShortestPath::getNeighbors(size_t rowNum, size_t step false, {}, -1, + nullptr, nullptr) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, reverse, stepNum, getNbrTime](auto&& resp) { diff --git a/src/graph/executor/algo/SingleShortestPath.cpp b/src/graph/executor/algo/SingleShortestPath.cpp index 401edc8ada3..6b2affb2f40 100644 --- a/src/graph/executor/algo/SingleShortestPath.cpp +++ b/src/graph/executor/algo/SingleShortestPath.cpp @@ -104,6 +104,7 @@ folly::Future SingleShortestPath::getNeighbors(size_t rowNum, false, {}, -1, + nullptr, nullptr) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, stepNum, getNbrTime, reverse](auto&& resp) { diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index 986dcc9152e..00694543acb 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -53,7 +53,8 @@ folly::Future GetNeighborsExecutor::execute() { gn_->random(), gn_->orderBy(), gn_->limit(qec), - gn_->filter()) + gn_->filter(), + nullptr) .via(runner()) .ensure([this, getNbrTime]() { SCOPED_TIMER(&execTime_); diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 1af3a78f1b5..d9ef3f19a6e 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -94,7 +94,8 @@ folly::Future TraverseExecutor::getNeighbors() { finalStep ? traverse_->random() : false, finalStep ? traverse_->orderBy() : std::vector(), finalStep ? traverse_->limit(qctx()) : -1, - selectFilter()) + selectFilter(), + nullptr) .via(runner()) .thenValue([this, getNbrTime](StorageRpcResponse&& resp) mutable { SCOPED_TIMER(&execTime_); diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index daf11e02f73..3ff610192e3 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -158,6 +158,8 @@ struct TraverseSpec { 10: optional i64 limit, // If provided, only the rows satisfied the given expression will be returned 11: optional binary filter, + // only contain filter expression for vertex + 12: optional binary vertex_filter, } diff --git a/src/storage/CommonUtils.h b/src/storage/CommonUtils.h index 2826dc654d1..0cab8f44c67 100644 --- a/src/storage/CommonUtils.h +++ b/src/storage/CommonUtils.h @@ -134,6 +134,7 @@ enum class ResultStatus { NORMAL = 0, ILLEGAL_DATA = -1, FILTER_OUT = -2, + TAG_FILTER_OUT = -3, }; struct PropContext; diff --git a/src/storage/exec/FilterNode.h b/src/storage/exec/FilterNode.h index d8d24dd88ae..70059ee306b 100644 --- a/src/storage/exec/FilterNode.h +++ b/src/storage/exec/FilterNode.h @@ -68,6 +68,10 @@ class FilterNode : public IterateNode { mode_ = mode; } + void setVertexFilter(Expression* vertexFilter) { + vertexFilterExp_ = vertexFilter; + } + private: bool check() override { if (filterExp_ == nullptr) { @@ -93,6 +97,13 @@ class FilterNode : public IterateNode { // return true when the value iter points to a value which can filter bool checkTagAndEdge() { expCtx_->reset(this->reader(), this->key().str()); + if (vertexFilterExp_ != nullptr) { + auto res = vertexFilterExp_->eval(*expCtx_); + if (!res.isBool() || !res.getBool()) { + context_->resultStat_ = ResultStatus::TAG_FILTER_OUT; + return false; + } + } // result is false when filter out auto result = filterExp_->eval(*expCtx_); // NULL is always false @@ -104,6 +115,7 @@ class FilterNode : public IterateNode { RuntimeContext* context_; StorageExpressionContext* expCtx_; Expression* filterExp_; + Expression* vertexFilterExp_; FilterMode mode_{FilterMode::TAG_AND_EDGE}; int32_t callCheck{0}; }; diff --git a/src/storage/exec/GetNeighborsNode.h b/src/storage/exec/GetNeighborsNode.h index c892ae7bee3..60f61167964 100644 --- a/src/storage/exec/GetNeighborsNode.h +++ b/src/storage/exec/GetNeighborsNode.h @@ -50,6 +50,10 @@ class GetNeighborsNode : public QueryNode { return nebula::cpp2::ErrorCode::E_INVALID_DATA; } + if (context_->resultStat_ == ResultStatus::TAG_FILTER_OUT) { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + std::vector row; // vertexId is the first column if (context_->isIntId()) { diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index 97d34f85dec..a60ebf7fec9 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -261,7 +261,10 @@ StoragePlan GetNeighborsProcessor::buildPlan(RuntimeContext* context, filter->addDependency(upstream); upstream = filter.get(); if (edges.empty()) { - filter.get()->setFilterMode(FilterMode::TAG_ONLY); + filter->setFilterMode(FilterMode::TAG_ONLY); + } + if (vertexFilter_) { + filter->setVertexFilter(vertexFilter_->clone()); } plan.addNode(std::move(filter)); } @@ -313,13 +316,16 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::checkAndBuildContexts( if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } - code = buildFilter(req, [](const cpp2::GetNeighborsRequest& r) -> const std::string* { - if (r.get_traverse_spec().filter_ref().has_value()) { - return r.get_traverse_spec().get_filter(); - } else { - return nullptr; - } - }); + code = + buildFilter(req, [](const cpp2::GetNeighborsRequest& r, bool isVertex) -> const std::string* { + if (isVertex) { + return r.get_traverse_spec().vertex_filter_ref().has_value() + ? r.get_traverse_spec().get_vertex_filter() + : nullptr; + } + return r.get_traverse_spec().filter_ref().has_value() ? r.get_traverse_spec().get_filter() + : nullptr; + }); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } diff --git a/src/storage/query/GetPropProcessor.cpp b/src/storage/query/GetPropProcessor.cpp index c20030002e2..cb5bdb5c641 100644 --- a/src/storage/query/GetPropProcessor.cpp +++ b/src/storage/query/GetPropProcessor.cpp @@ -272,7 +272,8 @@ nebula::cpp2::ErrorCode GetPropProcessor::checkAndBuildContexts(const cpp2::GetP return code; } } - code = buildFilter(req, [](const cpp2::GetPropRequest& r) -> const std::string* { + code = buildFilter(req, [](const cpp2::GetPropRequest& r, bool isVertex) -> const std::string* { + UNUSED(isVertex); if (r.filter_ref().has_value()) { return r.get_filter(); } else { diff --git a/src/storage/query/QueryBaseProcessor-inl.h b/src/storage/query/QueryBaseProcessor-inl.h index a0c61a7520d..81acf88774e 100644 --- a/src/storage/query/QueryBaseProcessor-inl.h +++ b/src/storage/query/QueryBaseProcessor-inl.h @@ -138,8 +138,8 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::buildYields(const REQ& re template nebula::cpp2::ErrorCode QueryBaseProcessor::buildFilter( - const REQ& req, std::function&& getFilter) { - const auto* filterStr = getFilter(req); + const REQ& req, std::function&& getFilter) { + const auto* filterStr = getFilter(req, false); if (filterStr == nullptr) { return nebula::cpp2::ErrorCode::SUCCEEDED; } @@ -152,6 +152,13 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::buildFilter( if (filter_ == nullptr) { return nebula::cpp2::ErrorCode::E_INVALID_FILTER; } + const auto* vertexFilterStr = getFilter(req, true); + if (vertexFilterStr != nullptr && !vertexFilterStr->empty()) { + vertexFilter_ = Expression::decode(pool, *vertexFilterStr); + if (vertexFilter_ == nullptr) { + return nebula::cpp2::ErrorCode::E_INVALID_FILTER; + } + } return checkExp(filter_, false, true, false, true); } return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/storage/query/QueryBaseProcessor.h b/src/storage/query/QueryBaseProcessor.h index 4160013b8ba..00dcab38b75 100644 --- a/src/storage/query/QueryBaseProcessor.h +++ b/src/storage/query/QueryBaseProcessor.h @@ -169,7 +169,7 @@ class QueryBaseProcessor : public BaseProcessor { nebula::cpp2::ErrorCode handleEdgeProps(std::vector& edgeProps); nebula::cpp2::ErrorCode buildFilter( - const REQ& req, std::function&& getFilter); + const REQ& req, std::function&& getFilter); nebula::cpp2::ErrorCode buildYields(const REQ& req); // build ttl info map @@ -207,6 +207,7 @@ class QueryBaseProcessor : public BaseProcessor { TagContext tagContext_; EdgeContext edgeContext_; Expression* filter_{nullptr}; + Expression* vertexFilter_{nullptr}; // Collect prop in value expression in upsert set clause std::unordered_set valueProps_; diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 780f0fbaf27..f595637a4be 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -65,7 +65,8 @@ nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::Sca std::vector returnProps = *req.return_columns_ref(); ret = handleEdgeProps(returnProps); buildEdgeColName(returnProps); - ret = buildFilter(req, [](const cpp2::ScanEdgeRequest& r) -> const std::string* { + ret = buildFilter(req, [](const cpp2::ScanEdgeRequest& r, bool isVertex) -> const std::string* { + UNUSED(isVertex); if (r.filter_ref().has_value()) { return r.get_filter(); } else { diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 2c1b611f8f2..6a3a168a337 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -68,7 +68,8 @@ nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( std::vector returnProps = *req.return_columns_ref(); ret = handleVertexProps(returnProps); buildTagColName(returnProps); - ret = buildFilter(req, [](const cpp2::ScanVertexRequest& r) -> const std::string* { + ret = buildFilter(req, [](const cpp2::ScanVertexRequest& r, bool isVertex) -> const std::string* { + UNUSED(isVertex); if (r.filter_ref().has_value()) { return r.get_filter(); } else { From b4ca1fe207ed8f0504188f09884c48126f7ae7ed Mon Sep 17 00:00:00 2001 From: jimingquan Date: Thu, 22 Sep 2022 20:18:31 +0800 Subject: [PATCH 2/5] add test case --- src/storage/exec/FilterNode.h | 8 ++++--- src/storage/test/GetNeighborsTest.cpp | 30 +++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/storage/exec/FilterNode.h b/src/storage/exec/FilterNode.h index 70059ee306b..e6f2f01e5f5 100644 --- a/src/storage/exec/FilterNode.h +++ b/src/storage/exec/FilterNode.h @@ -55,7 +55,9 @@ class FilterNode : public IterateNode { break; } if (this->valid() && !check()) { - context_->resultStat_ = ResultStatus::FILTER_OUT; + if (context_->resultStat_ != ResultStatus::TAG_FILTER_OUT) { + context_->resultStat_ = ResultStatus::FILTER_OUT; + } this->next(); continue; } @@ -114,8 +116,8 @@ class FilterNode : public IterateNode { private: RuntimeContext* context_; StorageExpressionContext* expCtx_; - Expression* filterExp_; - Expression* vertexFilterExp_; + Expression* filterExp_{nullptr}; + Expression* vertexFilterExp_{nullptr}; FilterMode mode_{FilterMode::TAG_AND_EDGE}; int32_t callCheck{0}; }; diff --git a/src/storage/test/GetNeighborsTest.cpp b/src/storage/test/GetNeighborsTest.cpp index c3b560732aa..3a6b6503e95 100644 --- a/src/storage/test/GetNeighborsTest.cpp +++ b/src/storage/test/GetNeighborsTest.cpp @@ -1787,6 +1787,36 @@ TEST(GetNeighborsTest, FilterTest) { expected.rows.emplace_back(std::move(row)); ASSERT_EQ(expected, *resp.vertices_ref()); } + { + LOG(INFO) << "Filter apply to vertices"; + std::vector vertices = {"Tim Duncan"}; + std::vector over = {serve}; + std::vector>> tags; + std::vector>> edges; + tags.emplace_back(player, std::vector{"name", "age"}); + edges.emplace_back(serve, std::vector{"teamName", "startYear", "endYear"}); + auto req = QueryTestUtils::buildRequest(totalParts, vertices, over, tags, edges); + // where $^.player.age > 50 + const auto& exp = *RelationalExpression::makeGT( + pool, + SourcePropertyExpression::make(pool, folly::to(player), "age"), + ConstantExpression::make(pool, Value(50))); + (*req.traverse_spec_ref()).filter_ref() = Expression::encode(exp); + (*req.traverse_spec_ref()).vertex_filter_ref() = Expression::encode(exp); + + auto* processor = GetNeighborsProcessor::instance(env, nullptr, threadPool.get()); + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size()); + // vId, stat, player, serve, expr + nebula::DataSet expected; + expected.colNames = { + kVid, "_stats", "_tag:1:name:age", "_edge:+101:teamName:startYear:endYear", "_expr"}; + ASSERT_EQ(expected.colNames, (*resp.vertices_ref()).colNames); + ASSERT_EQ(0, (*resp.vertices_ref()).rows.size()); + } { LOG(INFO) << "Filter apply to multi vertices"; std::vector vertices = { From b8790aab0c4eb52cd934ea9fe64b9cf04e1c540e Mon Sep 17 00:00:00 2001 From: jimingquan Date: Mon, 26 Sep 2022 10:42:30 +0800 Subject: [PATCH 3/5] add more unit test case --- src/storage/test/GetNeighborsTest.cpp | 38 +++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/storage/test/GetNeighborsTest.cpp b/src/storage/test/GetNeighborsTest.cpp index 3a6b6503e95..2d220cb95d1 100644 --- a/src/storage/test/GetNeighborsTest.cpp +++ b/src/storage/test/GetNeighborsTest.cpp @@ -1817,6 +1817,44 @@ TEST(GetNeighborsTest, FilterTest) { ASSERT_EQ(expected.colNames, (*resp.vertices_ref()).colNames); ASSERT_EQ(0, (*resp.vertices_ref()).rows.size()); } + { + LOG(INFO) << "Filter apply to vertices2"; + std::vector vertices = {"Tim Duncan", "Tony Parker"}; + std::vector over = {serve}; + std::vector>> tags; + std::vector>> edges; + tags.emplace_back(player, std::vector{"name", "age"}); + edges.emplace_back(serve, std::vector{"teamName", "startYear", "endYear"}); + auto req = QueryTestUtils::buildRequest(totalParts, vertices, over, tags, edges); + // where $^.player.age > 40 + const auto& exp = *RelationalExpression::makeGT( + pool, + SourcePropertyExpression::make(pool, folly::to(player), "age"), + ConstantExpression::make(pool, Value(40))); + (*req.traverse_spec_ref()).filter_ref() = Expression::encode(exp); + (*req.traverse_spec_ref()).vertex_filter_ref() = Expression::encode(exp); + + auto* processor = GetNeighborsProcessor::instance(env, nullptr, threadPool.get()); + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size()); + // vId, stat, player, serve, expr + nebula::DataSet expected; + expected.colNames = { + kVid, "_stats", "_tag:1:name:age", "_edge:+101:teamName:startYear:endYear", "_expr"}; + ASSERT_EQ(expected.colNames, (*resp.vertices_ref()).colNames); + auto serveEdges = nebula::List(); + serveEdges.values.emplace_back(nebula::List({"Spurs", 1997, 2016})); + nebula::Row row({"Tim Duncan", Value(), nebula::List({"Tim Duncan", 44}), serveEdges, Value()}); + for (size_t i = 0; i < 4; i++) { + if ((*resp.vertices_ref()).rows[i].values[0].getStr() == "Tim Duncan") { + ASSERT_EQ(row, (*resp.vertices_ref()).rows[i]); + break; + } + } + } { LOG(INFO) << "Filter apply to multi vertices"; std::vector vertices = { From 31aec11f9e1e02eea2dc7397ee8e5f94a2cd87f0 Mon Sep 17 00:00:00 2001 From: jimingquan Date: Mon, 26 Sep 2022 14:37:57 +0800 Subject: [PATCH 4/5] add comment --- src/interface/storage.thrift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 3ff610192e3..0aaadc1d2e9 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -158,7 +158,7 @@ struct TraverseSpec { 10: optional i64 limit, // If provided, only the rows satisfied the given expression will be returned 11: optional binary filter, - // only contain filter expression for vertex + // only contain filter expression for vertex, vertex_filter is a subset of filter 12: optional binary vertex_filter, } From 02a2b4130cfedcd376092726b5206e85d4dd58de Mon Sep 17 00:00:00 2001 From: jimingquan Date: Mon, 26 Sep 2022 16:37:47 +0800 Subject: [PATCH 5/5] address comment --- src/clients/storage/StorageClient.cpp | 6 +++--- src/clients/storage/StorageClient.h | 2 +- src/interface/storage.thrift | 4 ++-- src/storage/exec/FilterNode.h | 19 ++++++++++--------- src/storage/exec/GetNeighborsNode.h | 2 ++ src/storage/query/GetNeighborsProcessor.cpp | 15 ++++++--------- src/storage/query/GetPropProcessor.cpp | 4 ++-- src/storage/query/QueryBaseProcessor-inl.h | 10 +++++----- src/storage/query/QueryBaseProcessor.h | 4 ++-- src/storage/query/ScanEdgeProcessor.cpp | 4 ++-- src/storage/query/ScanVertexProcessor.cpp | 4 ++-- src/storage/test/GetNeighborsTest.cpp | 4 ++-- 12 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/clients/storage/StorageClient.cpp b/src/clients/storage/StorageClient.cpp index 57069e6c370..a86032b94e7 100644 --- a/src/clients/storage/StorageClient.cpp +++ b/src/clients/storage/StorageClient.cpp @@ -52,7 +52,7 @@ StorageRpcRespFuture StorageClient::getNeighbors( const std::vector& orderBy, int64_t limit, const Expression* filter, - const Expression* vertexFilter) { + const Expression* tagFilter) { auto cbStatus = getIdFromValue(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( @@ -98,8 +98,8 @@ StorageRpcRespFuture StorageClient::getNeighbors( if (filter != nullptr) { spec.filter_ref() = filter->encode(); } - if (vertexFilter != nullptr) { - spec.vertex_filter_ref() = vertexFilter->encode(); + if (tagFilter != nullptr) { + spec.tag_filter_ref() = tagFilter->encode(); } req.traverse_spec_ref() = std::move(spec); } diff --git a/src/clients/storage/StorageClient.h b/src/clients/storage/StorageClient.h index 442109c7e8b..f60dc7772f0 100644 --- a/src/clients/storage/StorageClient.h +++ b/src/clients/storage/StorageClient.h @@ -79,7 +79,7 @@ class StorageClient const std::vector& orderBy = std::vector(), int64_t limit = std::numeric_limits::max(), const Expression* filter = nullptr, - const Expression* vertexFilter = nullptr); + const Expression* tagFilter = nullptr); StorageRpcRespFuture getDstBySrc( const CommonRequestParam& param, diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 0aaadc1d2e9..42779735e32 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -158,8 +158,8 @@ struct TraverseSpec { 10: optional i64 limit, // If provided, only the rows satisfied the given expression will be returned 11: optional binary filter, - // only contain filter expression for vertex, vertex_filter is a subset of filter - 12: optional binary vertex_filter, + // only contain filter expression for tag, tag_filter is a subset of filter + 12: optional binary tag_filter, } diff --git a/src/storage/exec/FilterNode.h b/src/storage/exec/FilterNode.h index e6f2f01e5f5..667191997f7 100644 --- a/src/storage/exec/FilterNode.h +++ b/src/storage/exec/FilterNode.h @@ -39,8 +39,13 @@ class FilterNode : public IterateNode { FilterNode(RuntimeContext* context, IterateNode* upstream, StorageExpressionContext* expCtx = nullptr, - Expression* exp = nullptr) - : IterateNode(upstream), context_(context), expCtx_(expCtx), filterExp_(exp) { + Expression* exp = nullptr, + Expression* tagFilterExp = nullptr) + : IterateNode(upstream), + context_(context), + expCtx_(expCtx), + filterExp_(exp), + tagFilterExp_(tagFilterExp) { IterateNode::name_ = "FilterNode"; } @@ -70,10 +75,6 @@ class FilterNode : public IterateNode { mode_ = mode; } - void setVertexFilter(Expression* vertexFilter) { - vertexFilterExp_ = vertexFilter; - } - private: bool check() override { if (filterExp_ == nullptr) { @@ -99,8 +100,8 @@ class FilterNode : public IterateNode { // return true when the value iter points to a value which can filter bool checkTagAndEdge() { expCtx_->reset(this->reader(), this->key().str()); - if (vertexFilterExp_ != nullptr) { - auto res = vertexFilterExp_->eval(*expCtx_); + if (tagFilterExp_ != nullptr) { + auto res = tagFilterExp_->eval(*expCtx_); if (!res.isBool() || !res.getBool()) { context_->resultStat_ = ResultStatus::TAG_FILTER_OUT; return false; @@ -117,7 +118,7 @@ class FilterNode : public IterateNode { RuntimeContext* context_; StorageExpressionContext* expCtx_; Expression* filterExp_{nullptr}; - Expression* vertexFilterExp_{nullptr}; + Expression* tagFilterExp_{nullptr}; FilterMode mode_{FilterMode::TAG_AND_EDGE}; int32_t callCheck{0}; }; diff --git a/src/storage/exec/GetNeighborsNode.h b/src/storage/exec/GetNeighborsNode.h index 60f61167964..1125c53ea11 100644 --- a/src/storage/exec/GetNeighborsNode.h +++ b/src/storage/exec/GetNeighborsNode.h @@ -51,6 +51,8 @@ class GetNeighborsNode : public QueryNode { } if (context_->resultStat_ == ResultStatus::TAG_FILTER_OUT) { + // if the filter condition of the tag is not satisfied + // do not return the data for this vertex and corresponding edge return nebula::cpp2::ErrorCode::SUCCEEDED; } diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index a60ebf7fec9..367aa87c2b3 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -256,16 +256,13 @@ StoragePlan GetNeighborsProcessor::buildPlan(RuntimeContext* context, } if (filter_) { - auto filter = - std::make_unique>(context, upstream, expCtx, filter_->clone()); + auto filter = std::make_unique>( + context, upstream, expCtx, filter_->clone(), tagFilter_ ? tagFilter_->clone() : nullptr); filter->addDependency(upstream); upstream = filter.get(); if (edges.empty()) { filter->setFilterMode(FilterMode::TAG_ONLY); } - if (vertexFilter_) { - filter->setVertexFilter(vertexFilter_->clone()); - } plan.addNode(std::move(filter)); } @@ -317,10 +314,10 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::checkAndBuildContexts( return code; } code = - buildFilter(req, [](const cpp2::GetNeighborsRequest& r, bool isVertex) -> const std::string* { - if (isVertex) { - return r.get_traverse_spec().vertex_filter_ref().has_value() - ? r.get_traverse_spec().get_vertex_filter() + buildFilter(req, [](const cpp2::GetNeighborsRequest& r, bool onlyTag) -> const std::string* { + if (onlyTag) { + return r.get_traverse_spec().tag_filter_ref().has_value() + ? r.get_traverse_spec().get_tag_filter() : nullptr; } return r.get_traverse_spec().filter_ref().has_value() ? r.get_traverse_spec().get_filter() diff --git a/src/storage/query/GetPropProcessor.cpp b/src/storage/query/GetPropProcessor.cpp index cb5bdb5c641..21231ef8217 100644 --- a/src/storage/query/GetPropProcessor.cpp +++ b/src/storage/query/GetPropProcessor.cpp @@ -272,8 +272,8 @@ nebula::cpp2::ErrorCode GetPropProcessor::checkAndBuildContexts(const cpp2::GetP return code; } } - code = buildFilter(req, [](const cpp2::GetPropRequest& r, bool isVertex) -> const std::string* { - UNUSED(isVertex); + code = buildFilter(req, [](const cpp2::GetPropRequest& r, bool onlyTag) -> const std::string* { + UNUSED(onlyTag); if (r.filter_ref().has_value()) { return r.get_filter(); } else { diff --git a/src/storage/query/QueryBaseProcessor-inl.h b/src/storage/query/QueryBaseProcessor-inl.h index 81acf88774e..30402b88904 100644 --- a/src/storage/query/QueryBaseProcessor-inl.h +++ b/src/storage/query/QueryBaseProcessor-inl.h @@ -138,7 +138,7 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::buildYields(const REQ& re template nebula::cpp2::ErrorCode QueryBaseProcessor::buildFilter( - const REQ& req, std::function&& getFilter) { + const REQ& req, std::function&& getFilter) { const auto* filterStr = getFilter(req, false); if (filterStr == nullptr) { return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -152,10 +152,10 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::buildFilter( if (filter_ == nullptr) { return nebula::cpp2::ErrorCode::E_INVALID_FILTER; } - const auto* vertexFilterStr = getFilter(req, true); - if (vertexFilterStr != nullptr && !vertexFilterStr->empty()) { - vertexFilter_ = Expression::decode(pool, *vertexFilterStr); - if (vertexFilter_ == nullptr) { + const auto* tagFilterStr = getFilter(req, true); + if (tagFilterStr != nullptr && !tagFilterStr->empty()) { + tagFilter_ = Expression::decode(pool, *tagFilterStr); + if (tagFilter_ == nullptr) { return nebula::cpp2::ErrorCode::E_INVALID_FILTER; } } diff --git a/src/storage/query/QueryBaseProcessor.h b/src/storage/query/QueryBaseProcessor.h index 00dcab38b75..de36eb33ff9 100644 --- a/src/storage/query/QueryBaseProcessor.h +++ b/src/storage/query/QueryBaseProcessor.h @@ -169,7 +169,7 @@ class QueryBaseProcessor : public BaseProcessor { nebula::cpp2::ErrorCode handleEdgeProps(std::vector& edgeProps); nebula::cpp2::ErrorCode buildFilter( - const REQ& req, std::function&& getFilter); + const REQ& req, std::function&& getFilter); nebula::cpp2::ErrorCode buildYields(const REQ& req); // build ttl info map @@ -207,7 +207,7 @@ class QueryBaseProcessor : public BaseProcessor { TagContext tagContext_; EdgeContext edgeContext_; Expression* filter_{nullptr}; - Expression* vertexFilter_{nullptr}; + Expression* tagFilter_{nullptr}; // Collect prop in value expression in upsert set clause std::unordered_set valueProps_; diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index f595637a4be..48d5f3044fa 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -65,8 +65,8 @@ nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::Sca std::vector returnProps = *req.return_columns_ref(); ret = handleEdgeProps(returnProps); buildEdgeColName(returnProps); - ret = buildFilter(req, [](const cpp2::ScanEdgeRequest& r, bool isVertex) -> const std::string* { - UNUSED(isVertex); + ret = buildFilter(req, [](const cpp2::ScanEdgeRequest& r, bool onlyTag) -> const std::string* { + UNUSED(onlyTag); if (r.filter_ref().has_value()) { return r.get_filter(); } else { diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 6a3a168a337..f969d1373a6 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -68,8 +68,8 @@ nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( std::vector returnProps = *req.return_columns_ref(); ret = handleVertexProps(returnProps); buildTagColName(returnProps); - ret = buildFilter(req, [](const cpp2::ScanVertexRequest& r, bool isVertex) -> const std::string* { - UNUSED(isVertex); + ret = buildFilter(req, [](const cpp2::ScanVertexRequest& r, bool onlyTag) -> const std::string* { + UNUSED(onlyTag); if (r.filter_ref().has_value()) { return r.get_filter(); } else { diff --git a/src/storage/test/GetNeighborsTest.cpp b/src/storage/test/GetNeighborsTest.cpp index 2d220cb95d1..3a0f9826a18 100644 --- a/src/storage/test/GetNeighborsTest.cpp +++ b/src/storage/test/GetNeighborsTest.cpp @@ -1802,7 +1802,7 @@ TEST(GetNeighborsTest, FilterTest) { SourcePropertyExpression::make(pool, folly::to(player), "age"), ConstantExpression::make(pool, Value(50))); (*req.traverse_spec_ref()).filter_ref() = Expression::encode(exp); - (*req.traverse_spec_ref()).vertex_filter_ref() = Expression::encode(exp); + (*req.traverse_spec_ref()).tag_filter_ref() = Expression::encode(exp); auto* processor = GetNeighborsProcessor::instance(env, nullptr, threadPool.get()); auto fut = processor->getFuture(); @@ -1832,7 +1832,7 @@ TEST(GetNeighborsTest, FilterTest) { SourcePropertyExpression::make(pool, folly::to(player), "age"), ConstantExpression::make(pool, Value(40))); (*req.traverse_spec_ref()).filter_ref() = Expression::encode(exp); - (*req.traverse_spec_ref()).vertex_filter_ref() = Expression::encode(exp); + (*req.traverse_spec_ref()).tag_filter_ref() = Expression::encode(exp); auto* processor = GetNeighborsProcessor::instance(env, nullptr, threadPool.get()); auto fut = processor->getFuture();