Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize subgraph #3871

Merged
merged 12 commits into from
Mar 2, 2022
25 changes: 25 additions & 0 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,29 @@ void GetNeighborsIter::next() {
}
}

size_t GetNeighborsIter::size() const {
size_t count = 0;
for (const auto& dsIdx : dsIndices_) {
for (const auto& row : dsIdx.ds->rows) {
for (const auto& edgeIdx : dsIdx.edgePropsMap) {
const auto& cell = row[edgeIdx.second.colIdx];
if (LIKELY(cell.isList())) {
count += cell.getList().size();
}
}
}
}
return count;
}

size_t GetNeighborsIter::numRows() const {
size_t count = 0;
for (const auto& dsIdx : dsIndices_) {
count += dsIdx.ds->size();
}
return count;
}

void GetNeighborsIter::erase() {
DCHECK_GE(bitIdx_, 0);
DCHECK_LT(bitIdx_, bitset_.size());
Expand Down Expand Up @@ -441,6 +464,7 @@ Value GetNeighborsIter::getVertex(const std::string& name) const {

List GetNeighborsIter::getVertices() {
List vertices;
vertices.reserve(numRows());
valid_ = true;
colIdx_ = -2;
for (currentDs_ = dsIndices_.begin(); currentDs_ < dsIndices_.end(); ++currentDs_) {
Expand Down Expand Up @@ -512,6 +536,7 @@ Value GetNeighborsIter::getEdge() const {

List GetNeighborsIter::getEdges() {
List edges;
edges.reserve(size());
for (; valid(); next()) {
auto edge = getEdge();
if (edge.isEdge()) {
Expand Down
8 changes: 5 additions & 3 deletions src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,11 @@ class GetNeighborsIter final : public Iterator {

void sample(int64_t count) override;

size_t size() const override {
LOG(FATAL) << "Unimplemented method for Get Neighbros iterator.";
}
// num of edges
size_t size() const override;

// num of vertices
size_t numRows() const;

const Value& getColumn(const std::string& col) const override;

Expand Down
3 changes: 2 additions & 1 deletion src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ struct SubgraphContext final : public AstContext {
Starts from;
StepClause steps;
std::string loopSteps;
YieldColumns* yieldExpr;
std::vector<std::string> colNames;
std::unordered_set<EdgeType> edgeTypes;
std::unordered_set<EdgeType> biDirectEdgeTypes;
std::vector<Value::Type> colType;
bool withProp{false};
bool getVertexProp{false};
bool getEdgeProp{false};
Expand Down
78 changes: 46 additions & 32 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,69 @@ folly::Future<Status> SubgraphExecutor::execute() {
DCHECK(currentStepVal.isInt());
auto currentStep = currentStepVal.getInt();
VLOG(1) << "Current Step is: " << currentStep << " Total Steps is: " << steps;

if (currentStep == steps) {
oneMoreStep();
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}
auto resultVar = subgraph->resultVar();

VLOG(1) << "input: " << subgraph->inputVar() << " output: " << node()->outputVar();
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
DCHECK(iter && iter->isGetNeighborsIter());
auto gnSize = iter->size();

ResultBuilder builder;
builder.value(iter->valuePtr());

std::unordered_map<Value, int64_t> currentVids;
currentVids.reserve(gnSize);
historyVids_.reserve(historyVids_.size() + gnSize);
if (currentStep == 1) {
for (; iter->valid(); iter->next()) {
const auto& src = iter->getColumn(nebula::kVid);
historyVids_.emplace(src);
currentVids.emplace(src, 0);
}
iter->reset();
}
for (; iter->valid(); iter->next()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (historyVids_.emplace(dst).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
}
}

VLOG(1) << "Next step vid is : " << ds;
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}

void SubgraphExecutor::oneMoreStep() {
auto* subgraph = asNode<Subgraph>(node());
auto output = subgraph->oneMoreStepOutput();
VLOG(1) << "OneMoreStep Input: " << subgraph->inputVar() << " Output: " << output;
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
DCHECK(iter && iter->isGetNeighborsIter());

ResultBuilder builder;
builder.value(iter->valuePtr());
auto& biDirectEdgeTypes = subgraph->biDirectEdgeTypes();
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (historyVids_.find(dst) == historyVids_.end()) {
iter->unstableErase();
auto findIter = historyVids_.find(dst);
if (findIter != historyVids_.end()) {
if (biDirectEdgeTypes.empty()) {
iter->next();
} else {
const auto& typeVal = iter->getEdgeProp("*", nebula::kType);
if (UNLIKELY(!typeVal.isInt())) {
iter->erase();
continue;
}
auto type = typeVal.getInt();
if (biDirectEdgeTypes.find(type) != biDirectEdgeTypes.end()) {
if (type < 0 || findIter->second + 2 == currentStep) {
iter->erase();
} else {
iter->next();
}
} else {
iter->next();
}
}
} else {
if (currentStep == steps) {
iter->erase();
continue;
}
if (currentVids.emplace(dst, currentStep).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
}
iter->next();
}
}
iter->reset();
builder.iter(std::move(iter));
ectx_->setResult(output, builder.build());
ectx_->setResult(resultVar, builder.build());
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}

} // namespace graph
Expand Down
33 changes: 29 additions & 4 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,34 @@

#include "graph/executor/Executor.h"

// Subgraph receive result from GetNeighbors
// There are two Main functions
// First : Extract the deduplicated destination VID from GetNeighbors
// Second: Delete previously visited edges and save the result(iter) to the variable `resultVar`
//
// Member:
// `historyVids_` : is hash table
// KEY : the VID of the visited destination Vertex
// VALUE : the number of steps to visit the KEY (starting vertex is 0)
// since each vertex will only be visited once, if it is a one-way edge expansion, there will be no
// duplicate edges. we only need to focus on the case of two-way expansion
//
// How to delete edges:
// determine whether a loop is formed by the number of steps. If the destination vid has been
// visited, and the number of steps of the destination vid differs by 2 from the current steps, it
// is judged that a loop is formed, the edge needs to be deleted
//
// For example: Topology is below
// a->c, a->b, b->a, b->c
// statement: get subgraph from 'a' both edge yield vertices as nodes, edges as relationships
// first steps : a->b, a->c, a<-b, all edges need to save
// second steps: b->a, b<-a, b->c, c<-a
// since it is a two-way expansion, the negative edge has already been visited,
// so b<-a & c<-a are deleted
// b->a : the number of steps of the destination vid `a` is 0, and the current steps is 2. it can be
// determined that a loop is formed, so this edge also needs to be deleted.
// b->c : determined by the number of steps that no loop is formed, so keep it

namespace nebula {
namespace graph {
class SubgraphExecutor : public Executor {
Expand All @@ -18,10 +46,7 @@ class SubgraphExecutor : public Executor {
folly::Future<Status> execute() override;

private:
void oneMoreStep();

private:
std::unordered_set<Value> historyVids_;
std::unordered_map<Value, int64_t> historyVids_;
};

} // namespace graph
Expand Down
66 changes: 36 additions & 30 deletions src/graph/executor/query/DataCollectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,43 +61,49 @@ folly::Future<Status> DataCollectExecutor::doCollect() {
}

Status DataCollectExecutor::collectSubgraph(const std::vector<std::string>& vars) {
const auto* dc = asNode<DataCollect>(node());
const auto& colType = dc->colType();
DataSet ds;
ds.colNames = std::move(colNames_);
std::unordered_set<std::tuple<Value, EdgeType, EdgeRanking, Value>> uniqueEdges;
for (auto i = vars.begin(); i != vars.end(); ++i) {
const auto& hist = ectx_->getHistory(*i);
for (auto j = hist.begin(); j != hist.end(); ++j) {
if (i == vars.begin() && j == hist.end() - 1) {
continue;
}
auto iter = (*j).iter();
if (!iter->isGetNeighborsIter()) {
std::stringstream msg;
msg << "Iterator should be kind of GetNeighborIter, but was: " << iter->kind();
return Status::Error(msg.str());
}
List vertices;
List edges;
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
auto originVertices = gnIter->getVertices();
for (auto& v : originVertices.values) {
if (UNLIKELY(!v.isVertex())) {
continue;
const auto& hist = ectx_->getHistory(vars[0]);
for (const auto& result : hist) {
auto iter = result.iter();
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
List vertices;
List edges;
Row row;
bool notEmpty = false;
for (const auto& type : colType) {
if (type == Value::Type::VERTEX) {
auto originVertices = gnIter->getVertices();
vertices.reserve(originVertices.size());
for (auto& v : originVertices.values) {
if (UNLIKELY(!v.isVertex())) {
continue;
}
vertices.emplace_back(std::move(v));
}
vertices.emplace_back(std::move(v));
}
auto originEdges = gnIter->getEdges();
for (auto& edge : originEdges.values) {
if (UNLIKELY(!edge.isEdge())) {
continue;
if (!vertices.empty()) {
notEmpty = true;
row.emplace_back(std::move(vertices));
}
const auto& e = edge.getEdge();
auto edgeKey = std::make_tuple(e.src, e.type, e.ranking, e.dst);
if (uniqueEdges.emplace(std::move(edgeKey)).second) {
} else {
auto originEdges = gnIter->getEdges();
edges.reserve(originEdges.size());
for (auto& edge : originEdges.values) {
if (UNLIKELY(!edge.isEdge())) {
continue;
}
edges.emplace_back(std::move(edge));
}
if (!edges.empty()) {
notEmpty = true;
}
row.emplace_back(std::move(edges));
}
ds.rows.emplace_back(Row({std::move(vertices), std::move(edges)}));
}
if (notEmpty) {
ds.rows.emplace_back(std::move(row));
}
}
result_.setDataSet(std::move(ds));
Expand Down
17 changes: 3 additions & 14 deletions src/graph/executor/test/DataCollectTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ TEST_F(DataCollectTest, CollectSubgraph) {
auto* dc = DataCollect::make(qctx_.get(), DataCollect::DCKind::kSubgraph);
dc->setInputVars({"input_datasets"});
dc->setColNames(std::vector<std::string>{"_vertices", "_edges"});
dc->setColType({Value::Type::VERTEX, Value::Type::EDGE});

auto dcExe = std::make_unique<DataCollectExecutor>(dc, qctx_.get());
auto future = dcExe->execute();
Expand All @@ -179,29 +180,21 @@ TEST_F(DataCollectTest, CollectSubgraph) {
auto iter = hist[0].iter();
auto* gNIter = static_cast<GetNeighborsIter*>(iter.get());
Row row;
std::unordered_set<Value> vids;
std::unordered_set<std::tuple<Value, int64_t, int64_t, Value>> edgeKeys;
List vertices;
List edges;
auto originVertices = gNIter->getVertices();
for (auto& v : originVertices.values) {
if (!v.isVertex()) {
continue;
}
if (vids.emplace(v.getVertex().vid).second) {
vertices.emplace_back(std::move(v));
}
vertices.emplace_back(std::move(v));
}
auto originEdges = gNIter->getEdges();
for (auto& e : originEdges.values) {
if (!e.isEdge()) {
continue;
}
auto edgeKey =
std::make_tuple(e.getEdge().src, e.getEdge().type, e.getEdge().ranking, e.getEdge().dst);
if (edgeKeys.emplace(std::move(edgeKey)).second) {
edges.emplace_back(std::move(e));
}
edges.emplace_back(std::move(e));
}
row.values.emplace_back(std::move(vertices));
row.values.emplace_back(std::move(edges));
Expand Down Expand Up @@ -243,10 +236,6 @@ TEST_F(DataCollectTest, EmptyResult) {

DataSet expected;
expected.colNames = {"_vertices", "_edges"};
Row row;
row.values.emplace_back(Value(List()));
row.values.emplace_back(Value(List()));
expected.rows.emplace_back(std::move(row));
EXPECT_EQ(result.value().getDataSet(), expected);
EXPECT_EQ(result.state(), Result::State::kSuccess);
}
Expand Down
Loading