Skip to content

Commit

Permalink
subgraph filter (#4357)
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 authored Oct 1, 2022
1 parent 03d55db commit d67cc4b
Show file tree
Hide file tree
Showing 23 changed files with 596 additions and 143 deletions.
14 changes: 14 additions & 0 deletions src/graph/context/ExecutionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ const Result& ExecutionContext::getResult(const std::string& name) const {
}
}

void ExecutionContext::setVersionedResult(const std::string& name,
Result&& result,
int64_t version) {
auto it = valueMap_.find(name);
if (it != valueMap_.end()) {
auto& hist = it->second;
auto size = hist.size();
if (static_cast<size_t>(std::abs(version)) >= size) {
return;
}
hist[(size + version - 1) % size] = std::move(result);
}
}

const Result& ExecutionContext::getVersionedResult(const std::string& name, int64_t version) const {
auto& result = getHistory(name);
auto size = result.size();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/context/ExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class ExecutionContext {

const Result& getVersionedResult(const std::string& name, int64_t version) const;

void setVersionedResult(const std::string& name, Result&& result, int64_t version);

size_t numVersions(const std::string& name) const;

// Return all existing history of the value. The front is the latest value
Expand Down
27 changes: 22 additions & 5 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "graph/context/Iterator.h"

#include <robin_hood.h>

#include "common/datatypes/Edge.h"
#include "common/datatypes/Vertex.h"
#include "common/memory/MemoryUtils.h"
Expand Down Expand Up @@ -417,13 +419,13 @@ const Value& GetNeighborsIter::getEdgeProp(const std::string& edge, const std::s

auto& currentEdge = currentEdgeName();
if (edge != "*" && (currentEdge.compare(1, std::string::npos, edge) != 0)) {
VLOG(1) << "Current edge: " << currentEdgeName() << " Wanted: " << edge;
DLOG(INFO) << "Current edge: " << currentEdgeName() << " Wanted: " << edge;
return Value::kEmpty;
}
auto index = currentDs_->edgePropsMap.find(currentEdge);
if (index == currentDs_->edgePropsMap.end()) {
VLOG(1) << "No edge found: " << edge;
VLOG(1) << "Current edge: " << currentEdge;
DLOG(INFO) << "No edge found: " << edge;
DLOG(INFO) << "Current edge: " << currentEdge;
return Value::kEmpty;
}
auto propIndex = index->second.propIndices.find(prop);
Expand Down Expand Up @@ -468,6 +470,22 @@ Value GetNeighborsIter::getVertex(const std::string& name) const {
return Value(std::move(vertex));
}

std::vector<Value> GetNeighborsIter::vids() {
std::vector<Value> vids;
vids.reserve(numRows());
valid_ = true;
colIdx_ = -2;
for (currentDs_ = dsIndices_.begin(); currentDs_ < dsIndices_.end(); ++currentDs_) {
rowsUpperBound_ = currentDs_->ds->rows.end();
for (currentRow_ = currentDs_->ds->rows.begin(); currentRow_ < currentDs_->ds->rows.end();
++currentRow_) {
vids.emplace_back(getColumn(0));
}
}
reset();
return vids;
}

List GetNeighborsIter::getVertices() {
List vertices;
vertices.reserve(numRows());
Expand All @@ -478,7 +496,6 @@ List GetNeighborsIter::getVertices() {
for (currentRow_ = currentDs_->ds->rows.begin(); currentRow_ < currentDs_->ds->rows.end();
++currentRow_) {
vertices.values.emplace_back(getVertex());
VLOG(1) << "vertex: " << getVertex() << " size: " << vertices.size();
}
}
reset();
Expand Down Expand Up @@ -549,8 +566,8 @@ List GetNeighborsIter::getEdges() {
auto edge = getEdge();
if (edge.isEdge()) {
const_cast<Edge&>(edge.getEdge()).format();
edges.values.emplace_back(std::move(edge));
}
edges.values.emplace_back(std::move(edge));
}
reset();
return edges;
Expand Down
3 changes: 3 additions & 0 deletions src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ class GetNeighborsIter final : public Iterator {
// Its unique based on the plan
List getVertices();

// return start vids
std::vector<Value> vids();

// Its unique based on the GN interface dedup
List getEdges();

Expand Down
4 changes: 4 additions & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,14 @@ struct SubgraphContext final : public AstContext {
Starts from;
StepClause steps;
std::string loopSteps;
Expression* filter{nullptr};
Expression* tagFilter{nullptr};
Expression* edgeFilter{nullptr};
std::vector<std::string> colNames;
std::unordered_set<EdgeType> edgeTypes;
std::unordered_set<EdgeType> biDirectEdgeTypes;
std::vector<Value::Type> colType;
ExpressionProps exprProps;
bool withProp{false};
bool getVertexProp{false};
bool getEdgeProp{false};
Expand Down
168 changes: 142 additions & 26 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,153 @@

#include "graph/executor/algo/SubgraphExecutor.h"

#include "graph/planner/plan/Algo.h"
#include "graph/service/GraphFlags.h"

using nebula::storage::StorageClient;
namespace nebula {
namespace graph {

folly::Future<Status> SubgraphExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto* subgraph = asNode<Subgraph>(node());
DataSet ds;
ds.colNames = subgraph->colNames();
totalSteps_ = subgraph_->steps();
auto iter = ectx_->getResult(subgraph_->inputVar()).iter();
auto res = buildRequestListByVidType(iter.get(), subgraph_->src(), true);
NG_RETURN_IF_ERROR(res);
vids_ = res.value();
if (vids_.empty()) {
DataSet emptyResult;
return finish(ResultBuilder().value(Value(std::move(emptyResult))).build());
}
return getNeighbors();
}

folly::Future<Status> SubgraphExecutor::getNeighbors() {
time::Duration getNbrTime;
StorageClient* storageClient = qctx_->getStorageClient();
StorageClient::CommonRequestParam param(subgraph_->space(),
qctx_->rctx()->session()->id(),
qctx_->plan()->id(),
qctx_->plan()->isProfileEnabled());

storage::cpp2::EdgeDirection edgeDirection{Direction::OUT_EDGE};
return storageClient
->getNeighbors(param,
{nebula::kVid},
std::move(vids_),
{},
edgeDirection,
nullptr,
subgraph_->vertexProps(),
subgraph_->edgeProps(),
nullptr,
false,
false,
{},
-1,
currentStep_ == 1 ? subgraph_->edgeFilter() : subgraph_->filter(),
currentStep_ == 1 ? nullptr : subgraph_->tagFilter())
.via(runner())
.thenValue([this, getNbrTime](RpcResponse&& resp) mutable {
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec()));
auto& hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resp.responses()[i];
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto& info = hostLatency[i];
otherStats_.emplace(
folly::sformat("{} exec/total/vertices", std::get<0>(info).toString()),
folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size));
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
otherStats_.emplace("storage_detail", detail);
}
}
vids_.clear();
return handleResponse(std::move(resp));
});
}

folly::Future<Status> SubgraphExecutor::handleResponse(RpcResponse&& resps) {
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
return folly::makeFuture<Status>(std::move(result).status());
}

auto& responses = resps.responses();
List list;
for (auto& resp : responses) {
auto dataset = resp.get_vertices();
if (dataset == nullptr) {
continue;
}
list.values.emplace_back(std::move(*dataset));
}

auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);

auto steps = totalSteps_;
if (!subgraph_->oneMoreStep()) {
--steps;
}

uint32_t steps = subgraph->steps();
const auto& currentStepVal = ectx_->getValue(subgraph->currentStepVar());
DCHECK(currentStepVal.isInt());
auto currentStep = currentStepVal.getInt();
auto resultVar = subgraph->resultVar();
if (!process(std::move(iter)) || ++currentStep_ > steps) {
filterEdges(0);
return folly::makeFuture<Status>(Status::OK());
} else {
return getNeighbors();
}
}

auto iter = ectx_->getResult(subgraph->inputVar()).iter();
auto gnSize = iter->size();
void SubgraphExecutor::filterEdges(int version) {
auto iter = ectx_->getVersionedResult(subgraph_->outputVar(), version).iter();
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
while (gnIter->valid()) {
const auto& dst = gnIter->getEdgeProp("*", nebula::kDst);
if (validVids_.find(dst) == validVids_.end()) {
auto edge = gnIter->getEdge();
gnIter->erase();
} else {
gnIter->next();
}
}
gnIter->reset();
ResultBuilder builder;
builder.iter(std::move(iter));
ectx_->setVersionedResult(subgraph_->outputVar(), builder.build(), version);
}

bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
auto gnSize = iter->numRows();
if (gnSize == 0) {
return false;
}

ResultBuilder builder;
builder.value(iter->valuePtr());

robin_hood::unordered_flat_map<Value, int64_t, std::hash<Value>> currentVids;
HashMap currentVids;
currentVids.reserve(gnSize);
historyVids_.reserve(historyVids_.size() + gnSize);
if (currentStep == 1) {
for (; iter->valid(); iter->next()) {
const auto& src = iter->getColumn(nebula::kVid);
currentVids.emplace(src, 0);
auto startVids = iter->vids();
if (currentStep_ == 1) {
for (auto& startVid : startVids) {
currentVids.emplace(startVid, 0);
}
iter->reset();
}
auto& biDirectEdgeTypes = subgraph->biDirectEdgeTypes();
validVids_.insert(std::make_move_iterator(startVids.begin()),
std::make_move_iterator(startVids.end()));
auto& biDirectEdgeTypes = subgraph_->biDirectEdgeTypes();
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (dst.empty()) {
// no edge, dst is empty
iter->next();
continue;
}
auto findIter = historyVids_.find(dst);
if (findIter != historyVids_.end()) {
if (biDirectEdgeTypes.empty()) {
Expand All @@ -52,7 +163,7 @@ folly::Future<Status> SubgraphExecutor::execute() {
}
auto type = typeVal.getInt();
if (biDirectEdgeTypes.find(type) != biDirectEdgeTypes.end()) {
if (type < 0 || findIter->second + 2 == currentStep) {
if (type < 0 || findIter->second + 2 == currentStep_) {
iter->erase();
} else {
iter->next();
Expand All @@ -62,25 +173,30 @@ folly::Future<Status> SubgraphExecutor::execute() {
}
}
} else {
if (currentStep == steps) {
if (currentStep_ == totalSteps_) {
iter->erase();
continue;
}
if (currentVids.emplace(dst, currentStep).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
if (currentVids.emplace(dst, currentStep_).second) {
// next vids for getNeighbor
vids_.emplace_back(std::move(dst));
}
iter->next();
}
}
iter->reset();
builder.iter(std::move(iter));
ectx_->setResult(resultVar, builder.build());
finish(builder.build());
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
return finish(ResultBuilder().value(Value(std::move(ds))).build());
if (currentStep_ != 1 && subgraph_->tagFilter()) {
filterEdges(-1);
}
if (vids_.empty()) {
return false;
}
return true;
}

} // namespace graph
Expand Down
31 changes: 27 additions & 4 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

#include <robin_hood.h>

#include "graph/executor/Executor.h"
#include "graph/executor/StorageAccessExecutor.h"
#include "graph/planner/plan/Algo.h"

// Subgraph receive result from GetNeighbors
// There are two Main functions
Expand Down Expand Up @@ -39,15 +40,37 @@

namespace nebula {
namespace graph {
class SubgraphExecutor : public Executor {
using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetNeighborsResponse>;

class SubgraphExecutor : public StorageAccessExecutor {
public:
using HashMap = robin_hood::unordered_flat_map<Value, size_t, std::hash<Value>>;
using HashSet = robin_hood::unordered_flat_set<Value, std::hash<Value>>;

SubgraphExecutor(const PlanNode* node, QueryContext* qctx)
: Executor("SubgraphExecutor", node, qctx) {}
: StorageAccessExecutor("SubgraphExecutor", node, qctx) {
subgraph_ = asNode<Subgraph>(node);
}

folly::Future<Status> execute() override;

folly::Future<Status> getNeighbors();

bool process(std::unique_ptr<GetNeighborsIter> iter);

// filter out edges that do not meet the conditions in the previous step
void filterEdges(int version);

folly::Future<Status> handleResponse(RpcResponse&& resps);

private:
robin_hood::unordered_flat_map<Value, int64_t, std::hash<Value>> historyVids_;
HashMap historyVids_;
const Subgraph* subgraph_{nullptr};
size_t currentStep_{1};
size_t totalSteps_{1};
std::vector<Value> vids_;
// save vids already visited
HashSet validVids_;
};

} // namespace graph
Expand Down
Loading

0 comments on commit d67cc4b

Please sign in to comment.