Skip to content

Commit

Permalink
Feature/getprops limit push down (#3839)
Browse files Browse the repository at this point in the history
* Support limit push down in GetProps.

* Add tests.

* Remove unused code.

* Fix typo.

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
Shylock-Hg and Sophie-Xie authored Feb 22, 2022
1 parent c31b6bf commit 7c0d0da
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/graph/planner/plan/Query.h
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ class AppendVertices final : public GetVertices {
nullptr,
false,
{},
0,
-1, // means no limit
nullptr) {}

void cloneMembers(const AppendVertices& a);
Expand Down
24 changes: 20 additions & 4 deletions src/storage/exec/GetPropNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ class GetTagPropNode : public QueryNode<VertexID> {

GetTagPropNode(RuntimeContext* context,
std::vector<TagNode*> tagNodes,
nebula::DataSet* resultDataSet)
: context_(context), tagNodes_(std::move(tagNodes)), resultDataSet_(resultDataSet) {
nebula::DataSet* resultDataSet,
std::size_t limit)
: context_(context),
tagNodes_(std::move(tagNodes)),
resultDataSet_(resultDataSet),
limit_(limit) {
name_ = "GetTagPropNode";
}

nebula::cpp2::ErrorCode doExecute(PartitionID partId, const VertexID& vId) override {
if (resultDataSet_->size() >= limit_) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
auto ret = RelNode::doExecute(partId, vId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
Expand Down Expand Up @@ -85,6 +92,7 @@ class GetTagPropNode : public QueryNode<VertexID> {
RuntimeContext* context_;
std::vector<TagNode*> tagNodes_;
nebula::DataSet* resultDataSet_;
const std::size_t limit_{std::numeric_limits<std::size_t>::max()};
};

class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
Expand All @@ -93,12 +101,19 @@ class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {

GetEdgePropNode(RuntimeContext* context,
std::vector<EdgeNode<cpp2::EdgeKey>*> edgeNodes,
nebula::DataSet* resultDataSet)
: context_(context), edgeNodes_(std::move(edgeNodes)), resultDataSet_(resultDataSet) {
nebula::DataSet* resultDataSet,
std::size_t limit)
: context_(context),
edgeNodes_(std::move(edgeNodes)),
resultDataSet_(resultDataSet),
limit_(limit) {
QueryNode::name_ = "GetEdgePropNode";
}

nebula::cpp2::ErrorCode doExecute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
if (resultDataSet_->size() >= limit_) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
auto ret = RelNode::doExecute(partId, edgeKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
Expand Down Expand Up @@ -138,6 +153,7 @@ class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
RuntimeContext* context_;
std::vector<EdgeNode<cpp2::EdgeKey>*> edgeNodes_;
nebula::DataSet* resultDataSet_;
const std::size_t limit_{std::numeric_limits<std::size_t>::max()};
};

} // namespace storage
Expand Down
7 changes: 5 additions & 2 deletions src/storage/query/GetPropProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ void GetPropProcessor::process(const cpp2::GetPropRequest& req) {

void GetPropProcessor::doProcess(const cpp2::GetPropRequest& req) {
spaceId_ = req.get_space_id();
// Negative number means no limit
const auto rawLimit = req.limit_ref().value_or(-1);
limit_ = rawLimit < 0 ? std::numeric_limits<int64_t>::max() : rawLimit;
auto retCode = getSpaceVidLen(spaceId_);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
for (auto& p : req.get_parts()) {
Expand Down Expand Up @@ -199,7 +202,7 @@ StoragePlan<VertexID> GetPropProcessor::buildTagPlan(RuntimeContext* context,
tags.emplace_back(tag.get());
plan.addNode(std::move(tag));
}
auto output = std::make_unique<GetTagPropNode>(context, tags, result);
auto output = std::make_unique<GetTagPropNode>(context, tags, result, limit_);
for (auto* tag : tags) {
output->addDependency(tag);
}
Expand All @@ -216,7 +219,7 @@ StoragePlan<cpp2::EdgeKey> GetPropProcessor::buildEdgePlan(RuntimeContext* conte
edges.emplace_back(edge.get());
plan.addNode(std::move(edge));
}
auto output = std::make_unique<GetEdgePropNode>(context, edges, result);
auto output = std::make_unique<GetEdgePropNode>(context, edges, result, limit_);
for (auto* edge : edges) {
output->addDependency(edge);
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/query/GetPropProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class GetPropProcessor : public QueryBaseProcessor<cpp2::GetPropRequest, cpp2::G
std::vector<RuntimeContext> contexts_;
std::vector<nebula::DataSet> results_;
bool isEdge_ = false; // true for edge, false for tag
std::size_t limit_{std::numeric_limits<std::size_t>::max()};
};

} // namespace storage
Expand Down
173 changes: 171 additions & 2 deletions src/storage/test/GetPropTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ namespace storage {
cpp2::GetPropRequest buildVertexRequest(
int32_t totalParts,
const std::vector<VertexID>& vertices,
const std::vector<std::pair<TagID, std::vector<std::string>>>& tags) {
const std::vector<std::pair<TagID, std::vector<std::string>>>& tags,
int64_t limit = -1) {
std::hash<std::string> hash;
cpp2::GetPropRequest req;
req.space_id_ref() = 1;
req.limit_ref() = limit;
for (const auto& vertex : vertices) {
PartitionID partId = (hash(vertex) % totalParts) + 1;
nebula::Row row;
Expand Down Expand Up @@ -48,9 +50,11 @@ cpp2::GetPropRequest buildVertexRequest(
cpp2::GetPropRequest buildEdgeRequest(
int32_t totalParts,
const std::vector<cpp2::EdgeKey>& edgeKeys,
const std::vector<std::pair<EdgeType, std::vector<std::string>>>& edges) {
const std::vector<std::pair<EdgeType, std::vector<std::string>>>& edges,
int64_t limit = -1) {
cpp2::GetPropRequest req;
req.space_id_ref() = 1;
req.limit_ref() = limit;
for (const auto& edge : edgeKeys) {
PartitionID partId = (std::hash<Value>()(edge.get_src()) % totalParts) + 1;
nebula::Row row;
Expand Down Expand Up @@ -703,6 +707,171 @@ TEST(QueryVertexPropsTest, PrefixBloomFilterTest) {
FLAGS_enable_rocksdb_prefix_filtering = false;
}

TEST(GetPropTest, LimitTest) {
fs::TempDir rootPath("/tmp/GetPropTest.XXXXXX");
mock::MockCluster cluster;
cluster.initStorageKV(rootPath.path());
auto* env = cluster.storageEnv_.get();
auto totalParts = cluster.getTotalParts();
ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts));
ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts));

TagID player = 1;
EdgeType serve = 101;

// vertex
{
std::vector<VertexID> vertices = {"Tim Duncan"};
std::vector<std::pair<TagID, std::vector<std::string>>> tags;
tags.emplace_back(player, std::vector<std::string>{"name", "age", "avgScore"});
auto req = buildVertexRequest(totalParts, vertices, tags, 0);

auto* processor = GetPropProcessor::instance(env, nullptr, nullptr);
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();

ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size());
nebula::DataSet expected;
expected.colNames = {kVid, "1.name", "1.age", "1.avgScore"};
ASSERT_EQ(expected, *resp.props_ref());
}
{
std::vector<VertexID> vertices = {"Tim Duncan", "Tony Parker"};
std::vector<std::pair<TagID, std::vector<std::string>>> tags;
tags.emplace_back(player, std::vector<std::string>{"name", "age", "avgScore"});
auto req = buildVertexRequest(totalParts, vertices, tags, 1);

auto* processor = GetPropProcessor::instance(env, nullptr, nullptr);
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();

ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size());
ASSERT_EQ((*resp.props_ref()).size(), 1);
}
{
std::vector<VertexID> vertices = {"Tim Duncan", "Tony Parker"};
std::vector<std::pair<TagID, std::vector<std::string>>> tags;
tags.emplace_back(player, std::vector<std::string>{"name", "age", "avgScore"});
auto req = buildVertexRequest(totalParts, vertices, tags, 3);

auto* processor = GetPropProcessor::instance(env, nullptr, nullptr);
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();

ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size());
nebula::DataSet expected;
expected.colNames = {kVid, "1.name", "1.age", "1.avgScore"};
expected.emplace_back(Row({"Tim Duncan", "Tim Duncan", 44, 19.0}));
expected.emplace_back(Row({"Tony Parker", "Tony Parker", 38, 15.5}));
}
// edge
{
std::vector<cpp2::EdgeKey> edgeKeys;
{
cpp2::EdgeKey edgeKey;
edgeKey.src_ref() = "Tim Duncan";
edgeKey.edge_type_ref() = 101;
edgeKey.ranking_ref() = 1997;
edgeKey.dst_ref() = "Spurs";
edgeKeys.emplace_back(std::move(edgeKey));
}
{
cpp2::EdgeKey edgeKey;
edgeKey.src_ref() = "Tony Parker";
edgeKey.edge_type_ref() = 101;
edgeKey.ranking_ref() = 2001;
edgeKey.dst_ref() = "Spurs";
edgeKeys.emplace_back(std::move(edgeKey));
}
std::vector<std::pair<TagID, std::vector<std::string>>> edges;
edges.emplace_back(serve, std::vector<std::string>{"teamName", "startYear", "endYear"});
auto req = buildEdgeRequest(totalParts, edgeKeys, edges, 0);

auto* processor = GetPropProcessor::instance(env, nullptr, nullptr);
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();

ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size());
nebula::DataSet expected;
expected.colNames = {"101.teamName", "101.startYear", "101.endYear"};
ASSERT_EQ(expected, *resp.props_ref());
}
{
std::vector<cpp2::EdgeKey> edgeKeys;
{
cpp2::EdgeKey edgeKey;
edgeKey.src_ref() = "Tim Duncan";
edgeKey.edge_type_ref() = 101;
edgeKey.ranking_ref() = 1997;
edgeKey.dst_ref() = "Spurs";
edgeKeys.emplace_back(std::move(edgeKey));
}
{
cpp2::EdgeKey edgeKey;
edgeKey.src_ref() = "Tony Parker";
edgeKey.edge_type_ref() = 101;
edgeKey.ranking_ref() = 2001;
edgeKey.dst_ref() = "Spurs";
edgeKeys.emplace_back(std::move(edgeKey));
}
std::vector<std::pair<TagID, std::vector<std::string>>> edges;
edges.emplace_back(serve, std::vector<std::string>{"teamName", "startYear", "endYear"});
auto req = buildEdgeRequest(totalParts, edgeKeys, edges, 1);

auto* processor = GetPropProcessor::instance(env, nullptr, nullptr);
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();

ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size());
ASSERT_EQ((*resp.props_ref()).size(), 1);
}
{
std::vector<cpp2::EdgeKey> edgeKeys;
{
cpp2::EdgeKey edgeKey;
edgeKey.src_ref() = "Tim Duncan";
edgeKey.edge_type_ref() = 101;
edgeKey.ranking_ref() = 1997;
edgeKey.dst_ref() = "Spurs";
edgeKeys.emplace_back(std::move(edgeKey));
}
{
cpp2::EdgeKey edgeKey;
edgeKey.src_ref() = "Tony Parker";
edgeKey.edge_type_ref() = 101;
edgeKey.ranking_ref() = 2001;
edgeKey.dst_ref() = "Spurs";
edgeKeys.emplace_back(std::move(edgeKey));
}
std::vector<std::pair<TagID, std::vector<std::string>>> edges;
edges.emplace_back(serve, std::vector<std::string>{"teamName", "startYear", "endYear"});
auto req = buildEdgeRequest(totalParts, edgeKeys, edges, 3);

auto* processor = GetPropProcessor::instance(env, nullptr, nullptr);
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();

ASSERT_EQ(0, (*resp.result_ref()).failed_parts.size());
nebula::DataSet expected;
expected.colNames = {"101.teamName", "101.startYear", "101.endYear"};
{
nebula::Row row({"Spurs", 2001, 2018});
expected.rows.emplace_back(std::move(row));
}
{
nebula::Row row({"Spurs", 1997, 2016});
expected.rows.emplace_back(std::move(row));
}
ASSERT_EQ(expected, *resp.props_ref());
}
}

} // namespace storage
} // namespace nebula

Expand Down

0 comments on commit 7c0d0da

Please sign in to comment.