Skip to content

Commit

Permalink
refactor subgraph
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Feb 10, 2022
1 parent 4e1dee5 commit 3de4dba
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 48 deletions.
1 change: 1 addition & 0 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ List GetNeighborsIter::getVertices() {
valid_ = true;
colIdx_ = -2;
for (currentDs_ = dsIndices_.begin(); currentDs_ < dsIndices_.end(); ++currentDs_) {
rowsUpperBound_ = currentDs_->ds->rows.end();
for (currentRow_ = currentDs_->ds->rows.begin(); currentRow_ < currentDs_->ds->rows.end();
++currentRow_) {
vertices.values.emplace_back(getVertex());
Expand Down
1 change: 1 addition & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ struct SubgraphContext final : public AstContext {
YieldColumns* yieldExpr;
std::vector<std::string> colNames;
std::unordered_set<EdgeType> edgeTypes;
std::unordered_set<EdgeType> biDirectEdgeTypes;
bool withProp{false};
bool getVertexProp{false};
bool getEdgeProp{false};
Expand Down
52 changes: 41 additions & 11 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,43 @@ folly::Future<Status> SubgraphExecutor::execute() {

VLOG(1) << "input: " << subgraph->inputVar() << " output: " << node()->outputVar();
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
DCHECK(iter && iter->isGetNeighborsIter());
std::unordered_set<Value> currentVids;
currentVids.reserve(iter->size());
historyVids_.reserve(historyVids_.size() + iter->size());
if (currentStep == 1) {
for (; iter->valid(); iter->next()) {
const auto& src = iter->getColumn(nebula::kVid);
historyVids_.emplace(src);
currentVids.emplace(src);
}
iter->reset();
}
for (; iter->valid(); iter->next()) {
auto& biDirectEdgeTypes = subgraph->biDirectEdgeTypes();
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (historyVids_.emplace(dst).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
if (historyVids_.find(dst) != historyVids_.end()) {
if (biDirectEdgeTypes.empty()) {
iter->next();
} else {
const auto& type = iter->getEdgeProp("*", nebula::kType);
if (type.isInt() && biDirectEdgeTypes.find(type.getInt()) != biDirectEdgeTypes.end()) {
iter->erase();
} else {
iter->next();
}
}
} else {
if (currentVids.emplace(dst).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
}
iter->next();
}
}

iter->reset();
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
VLOG(1) << "Next step vid is : " << ds;
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}
Expand All @@ -56,15 +76,25 @@ void SubgraphExecutor::oneMoreStep() {
VLOG(1) << "OneMoreStep Input: " << subgraph->inputVar() << " Output: " << output;
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
DCHECK(iter && iter->isGetNeighborsIter());
auto& biDirectEdgeTypes = subgraph->biDirectEdgeTypes();

ResultBuilder builder;
builder.value(iter->valuePtr());
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (historyVids_.find(dst) == historyVids_.end()) {
iter->unstableErase();
if (historyVids_.find(dst) != historyVids_.end()) {
if (biDirectEdgeTypes.empty()) {
iter->next();
} else {
const auto& type = iter->getEdgeProp("*", nebula::kType);
if (type.isInt() && biDirectEdgeTypes.find(type.getInt()) != biDirectEdgeTypes.end()) {
iter->erase();
} else {
iter->next();
}
}
} else {
iter->next();
iter->erase();
}
}
iter->reset();
Expand Down
50 changes: 16 additions & 34 deletions src/graph/executor/query/DataCollectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "common/time/ScopedTimer.h"
#include "graph/planner/plan/Query.h"

DEFINE_bool(test_subgraph, false, "reserve uniqueEdges 's size");
namespace nebula {
namespace graph {
folly::Future<Status> DataCollectExecutor::execute() {
Expand Down Expand Up @@ -64,9 +63,6 @@ folly::Future<Status> DataCollectExecutor::doCollect() {
Status DataCollectExecutor::collectSubgraph(const std::vector<std::string>& vars) {
DataSet ds;
ds.colNames = std::move(colNames_);
size_t numEdges = 0;
std::vector<std::unique_ptr<Iterator>> iters;
iters.reserve(vars.size());
for (auto i = vars.begin(); i != vars.end(); ++i) {
const auto& hist = ectx_->getHistory(*i);
for (auto j = hist.begin(); j != hist.end(); ++j) {
Expand All @@ -75,40 +71,26 @@ Status DataCollectExecutor::collectSubgraph(const std::vector<std::string>& vars
}
auto iter = (*j).iter();
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
numEdges += gnIter->size();
iters.emplace_back(std::move(iter));
}
}

std::unordered_set<std::tuple<Value, EdgeType, EdgeRanking, Value>> uniqueEdges;
if (FLAGS_test_subgraph) {
uniqueEdges.reserve(numEdges);
}
for (const auto& iter : iters) {
List vertices;
List edges;
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
auto originVertices = gnIter->getVertices();
vertices.reserve(originVertices.size());
for (auto& v : originVertices.values) {
if (UNLIKELY(!v.isVertex())) {
continue;
}
vertices.emplace_back(std::move(v));
}
auto originEdges = gnIter->getEdges();
edges.reserve(originEdges.size());
for (auto& edge : originEdges.values) {
if (UNLIKELY(!edge.isEdge())) {
continue;
List vertices;
List edges;
auto originVertices = gnIter->getVertices();
vertices.reserve(originVertices.size());
for (auto& v : originVertices.values) {
if (UNLIKELY(!v.isVertex())) {
continue;
}
vertices.emplace_back(std::move(v));
}
const auto& e = edge.getEdge();
auto edgeKey = std::make_tuple(e.src, e.type, e.ranking, e.dst);
if (uniqueEdges.emplace(std::move(edgeKey)).second) {
auto originEdges = gnIter->getEdges();
edges.reserve(originEdges.size());
for (auto& edge : originEdges.values) {
if (UNLIKELY(!edge.isEdge())) {
continue;
}
edges.emplace_back(std::move(edge));
}
ds.rows.emplace_back(Row({std::move(vertices), std::move(edges)}));
}
ds.rows.emplace_back(Row({std::move(vertices), std::move(edges)}));
}
result_.setDataSet(std::move(ds));
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Status GetNeighborsExecutor::handleResponse(RpcResponse& resps) {

list.values.emplace_back(std::move(*dataset));
}
VLOG(1) << "GetNeightbors Result : " << list;
builder.value(Value(std::move(list))).iter(Iterator::Kind::kGetNeighbors);
return finish(builder.build());
}
Expand Down
4 changes: 4 additions & 0 deletions src/graph/planner/ngql/SubgraphPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ StatusOr<std::unique_ptr<std::vector<EdgeProp>>> SubgraphPlanner::buildEdgeProps
bool getEdgeProp = subgraphCtx_->withProp && subgraphCtx_->getEdgeProp;
const auto& space = subgraphCtx_->space;
auto& edgeTypes = subgraphCtx_->edgeTypes;
auto& biDirectEdgeTypes = subgraphCtx_->biDirectEdgeTypes;

if (edgeTypes.empty()) {
const auto allEdgesSchema = qctx->schemaMng()->getAllLatestVerEdgeSchema(space.id);
Expand All @@ -27,6 +28,8 @@ StatusOr<std::unique_ptr<std::vector<EdgeProp>>> SubgraphPlanner::buildEdgeProps
for (const auto& edge : allEdges) {
edgeTypes.emplace(edge.first);
edgeTypes.emplace(-edge.first);
biDirectEdgeTypes.emplace(edge.first);
biDirectEdgeTypes.emplace(-edge.first);
}
}
std::vector<EdgeType> vEdgeTypes(edgeTypes.begin(), edgeTypes.end());
Expand Down Expand Up @@ -70,6 +73,7 @@ StatusOr<SubPlan> SubgraphPlanner::nSteps(SubPlan& startVidPlan, const std::stri
subgraphCtx_->loopSteps = loopSteps;
auto* subgraph = Subgraph::make(qctx, gn, oneMoreStepOutput, loopSteps, steps.steps() + 1);
subgraph->setOutputVar(input);
subgraph->setBiDirectEdgeTypes(subgraphCtx_->biDirectEdgeTypes);
subgraph->setColNames({nebula::kVid});

auto* condition = loopCondition(steps.steps() + 1, gn->outputVar());
Expand Down
9 changes: 9 additions & 0 deletions src/graph/planner/plan/Algo.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ class Subgraph final : public SingleInputNode {
return steps_;
}

const std::unordered_set<EdgeType> biDirectEdgeTypes() const {
return biDirectEdgeTypes_;
}

void setBiDirectEdgeTypes(std::unordered_set<EdgeType> edgeTypes) {
biDirectEdgeTypes_ = std::move(edgeTypes);
}

private:
Subgraph(QueryContext* qctx,
PlanNode* input,
Expand All @@ -167,6 +175,7 @@ class Subgraph final : public SingleInputNode {
std::string oneMoreStepOutput_;
std::string currentStepVar_;
uint32_t steps_;
std::unordered_set<EdgeType> biDirectEdgeTypes_;
};

class BiCartesianProduct final : public BinaryInputNode {
Expand Down
9 changes: 6 additions & 3 deletions src/graph/validator/GetSubgraphValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ Status GetSubgraphValidator::validateOutBound(OutBoundClause* out) {

Status GetSubgraphValidator::validateBothInOutBound(BothInOutClause* out) {
auto& edgeTypes = subgraphCtx_->edgeTypes;
auto& biEdgeTypes = subgraphCtx_->biDirectEdgeTypes;
if (out != nullptr) {
auto space = vctx_->whichSpace();
auto edges = out->edges();
edgeTypes.reserve(edgeTypes.size() + edges.size());
edgeTypes.reserve(edgeTypes.size() + edges.size() * 2);
biEdgeTypes.reserve(edges.size() * 2);
for (auto* e : out->edges()) {
if (e->alias() != nullptr) {
return Status::SemanticError("Get Subgraph not support rename edge name.");
Expand All @@ -86,8 +88,9 @@ Status GetSubgraphValidator::validateBothInOutBound(BothInOutClause* out) {

auto v = et.value();
edgeTypes.emplace(v);
v = -v;
edgeTypes.emplace(v);
edgeTypes.emplace(-v);
biEdgeTypes.emplace(v);
biEdgeTypes.emplace(-v);
}
}
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions tests/tck/features/subgraph/subgraph.feature
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2021 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
@jmq
Feature: subgraph

Background:
Expand Down Expand Up @@ -325,6 +326,7 @@ Feature: subgraph
| <[vertex2]> | <[edge2]> |
| <[vertex3]> | <[edge3]> |

@jmm
Scenario: two steps in and out edge
When executing query:
"""
Expand Down

0 comments on commit 3de4dba

Please sign in to comment.