Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subgraph filter #4357

Merged
merged 13 commits into from
Oct 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/graph/context/ExecutionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ const Result& ExecutionContext::getResult(const std::string& name) const {
}
}

void ExecutionContext::setVersionedResult(const std::string& name,
Result&& result,
int64_t version) {
auto it = valueMap_.find(name);
if (it != valueMap_.end()) {
auto& hist = it->second;
auto size = hist.size();
if (static_cast<size_t>(std::abs(version)) >= size) {
return;
}
hist[(size + version - 1) % size] = std::move(result);
}
}

const Result& ExecutionContext::getVersionedResult(const std::string& name, int64_t version) const {
auto& result = getHistory(name);
auto size = result.size();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/context/ExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class ExecutionContext {

const Result& getVersionedResult(const std::string& name, int64_t version) const;

void setVersionedResult(const std::string& name, Result&& result, int64_t version);

size_t numVersions(const std::string& name) const;

// Return all existing history of the value. The front is the latest value
Expand Down
27 changes: 22 additions & 5 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "graph/context/Iterator.h"

#include <robin_hood.h>

#include "common/datatypes/Edge.h"
#include "common/datatypes/Vertex.h"
#include "common/memory/MemoryUtils.h"
Expand Down Expand Up @@ -417,13 +419,13 @@ const Value& GetNeighborsIter::getEdgeProp(const std::string& edge, const std::s

auto& currentEdge = currentEdgeName();
if (edge != "*" && (currentEdge.compare(1, std::string::npos, edge) != 0)) {
VLOG(1) << "Current edge: " << currentEdgeName() << " Wanted: " << edge;
DLOG(INFO) << "Current edge: " << currentEdgeName() << " Wanted: " << edge;
return Value::kEmpty;
}
auto index = currentDs_->edgePropsMap.find(currentEdge);
if (index == currentDs_->edgePropsMap.end()) {
VLOG(1) << "No edge found: " << edge;
VLOG(1) << "Current edge: " << currentEdge;
DLOG(INFO) << "No edge found: " << edge;
DLOG(INFO) << "Current edge: " << currentEdge;
return Value::kEmpty;
}
auto propIndex = index->second.propIndices.find(prop);
Expand Down Expand Up @@ -468,6 +470,22 @@ Value GetNeighborsIter::getVertex(const std::string& name) const {
return Value(std::move(vertex));
}

std::vector<Value> GetNeighborsIter::vids() {
std::vector<Value> vids;
vids.reserve(numRows());
valid_ = true;
colIdx_ = -2;
for (currentDs_ = dsIndices_.begin(); currentDs_ < dsIndices_.end(); ++currentDs_) {
rowsUpperBound_ = currentDs_->ds->rows.end();
for (currentRow_ = currentDs_->ds->rows.begin(); currentRow_ < currentDs_->ds->rows.end();
++currentRow_) {
vids.emplace_back(getColumn(0));
}
}
reset();
return vids;
}

List GetNeighborsIter::getVertices() {
List vertices;
vertices.reserve(numRows());
Expand All @@ -478,7 +496,6 @@ List GetNeighborsIter::getVertices() {
for (currentRow_ = currentDs_->ds->rows.begin(); currentRow_ < currentDs_->ds->rows.end();
++currentRow_) {
vertices.values.emplace_back(getVertex());
VLOG(1) << "vertex: " << getVertex() << " size: " << vertices.size();
}
}
reset();
Expand Down Expand Up @@ -549,8 +566,8 @@ List GetNeighborsIter::getEdges() {
auto edge = getEdge();
if (edge.isEdge()) {
const_cast<Edge&>(edge.getEdge()).format();
edges.values.emplace_back(std::move(edge));
}
edges.values.emplace_back(std::move(edge));
}
reset();
return edges;
Expand Down
3 changes: 3 additions & 0 deletions src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ class GetNeighborsIter final : public Iterator {
// Its unique based on the plan
List getVertices();

// return start vids
std::vector<Value> vids();

// Its unique based on the GN interface dedup
List getEdges();

Expand Down
4 changes: 4 additions & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,14 @@ struct SubgraphContext final : public AstContext {
Starts from;
StepClause steps;
std::string loopSteps;
Expression* filter{nullptr};
Expression* tagFilter{nullptr};
Expression* edgeFilter{nullptr};
std::vector<std::string> colNames;
std::unordered_set<EdgeType> edgeTypes;
std::unordered_set<EdgeType> biDirectEdgeTypes;
std::vector<Value::Type> colType;
ExpressionProps exprProps;
bool withProp{false};
bool getVertexProp{false};
bool getEdgeProp{false};
Expand Down
168 changes: 142 additions & 26 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,153 @@

#include "graph/executor/algo/SubgraphExecutor.h"

#include "graph/planner/plan/Algo.h"
#include "graph/service/GraphFlags.h"

using nebula::storage::StorageClient;
namespace nebula {
namespace graph {

folly::Future<Status> SubgraphExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto* subgraph = asNode<Subgraph>(node());
DataSet ds;
ds.colNames = subgraph->colNames();
totalSteps_ = subgraph_->steps();
auto iter = ectx_->getResult(subgraph_->inputVar()).iter();
auto res = buildRequestListByVidType(iter.get(), subgraph_->src(), true);
NG_RETURN_IF_ERROR(res);
vids_ = res.value();
if (vids_.empty()) {
DataSet emptyResult;
return finish(ResultBuilder().value(Value(std::move(emptyResult))).build());
}
return getNeighbors();
}

folly::Future<Status> SubgraphExecutor::getNeighbors() {
time::Duration getNbrTime;
StorageClient* storageClient = qctx_->getStorageClient();
StorageClient::CommonRequestParam param(subgraph_->space(),
qctx_->rctx()->session()->id(),
qctx_->plan()->id(),
qctx_->plan()->isProfileEnabled());

storage::cpp2::EdgeDirection edgeDirection{Direction::OUT_EDGE};
return storageClient
->getNeighbors(param,
{nebula::kVid},
std::move(vids_),
{},
edgeDirection,
nullptr,
subgraph_->vertexProps(),
subgraph_->edgeProps(),
nullptr,
false,
false,
{},
-1,
currentStep_ == 1 ? subgraph_->edgeFilter() : subgraph_->filter(),
currentStep_ == 1 ? nullptr : subgraph_->tagFilter())
.via(runner())
.thenValue([this, getNbrTime](RpcResponse&& resp) mutable {
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec()));
auto& hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resp.responses()[i];
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto& info = hostLatency[i];
otherStats_.emplace(
folly::sformat("{} exec/total/vertices", std::get<0>(info).toString()),
folly::sformat("{}(us)/{}(us)/{},", std::get<1>(info), std::get<2>(info), size));
auto detail = getStorageDetail(result.result.latency_detail_us_ref());
if (!detail.empty()) {
otherStats_.emplace("storage_detail", detail);
}
}
vids_.clear();
return handleResponse(std::move(resp));
});
}

folly::Future<Status> SubgraphExecutor::handleResponse(RpcResponse&& resps) {
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
return folly::makeFuture<Status>(std::move(result).status());
}

auto& responses = resps.responses();
List list;
for (auto& resp : responses) {
auto dataset = resp.get_vertices();
if (dataset == nullptr) {
continue;
}
list.values.emplace_back(std::move(*dataset));
}

auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);

auto steps = totalSteps_;
if (!subgraph_->oneMoreStep()) {
--steps;
}

uint32_t steps = subgraph->steps();
const auto& currentStepVal = ectx_->getValue(subgraph->currentStepVar());
DCHECK(currentStepVal.isInt());
auto currentStep = currentStepVal.getInt();
auto resultVar = subgraph->resultVar();
if (!process(std::move(iter)) || ++currentStep_ > steps) {
filterEdges(0);
return folly::makeFuture<Status>(Status::OK());
} else {
return getNeighbors();
}
}

auto iter = ectx_->getResult(subgraph->inputVar()).iter();
auto gnSize = iter->size();
void SubgraphExecutor::filterEdges(int version) {
auto iter = ectx_->getVersionedResult(subgraph_->outputVar(), version).iter();
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
while (gnIter->valid()) {
const auto& dst = gnIter->getEdgeProp("*", nebula::kDst);
if (validVids_.find(dst) == validVids_.end()) {
auto edge = gnIter->getEdge();
gnIter->erase();
} else {
gnIter->next();
}
}
gnIter->reset();
ResultBuilder builder;
builder.iter(std::move(iter));
ectx_->setVersionedResult(subgraph_->outputVar(), builder.build(), version);
}

bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
auto gnSize = iter->numRows();
if (gnSize == 0) {
return false;
}

ResultBuilder builder;
builder.value(iter->valuePtr());

robin_hood::unordered_flat_map<Value, int64_t, std::hash<Value>> currentVids;
HashMap currentVids;
currentVids.reserve(gnSize);
historyVids_.reserve(historyVids_.size() + gnSize);
if (currentStep == 1) {
for (; iter->valid(); iter->next()) {
const auto& src = iter->getColumn(nebula::kVid);
currentVids.emplace(src, 0);
auto startVids = iter->vids();
if (currentStep_ == 1) {
for (auto& startVid : startVids) {
currentVids.emplace(startVid, 0);
}
iter->reset();
}
auto& biDirectEdgeTypes = subgraph->biDirectEdgeTypes();
validVids_.insert(std::make_move_iterator(startVids.begin()),
std::make_move_iterator(startVids.end()));
auto& biDirectEdgeTypes = subgraph_->biDirectEdgeTypes();
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (dst.empty()) {
// no edge, dst is empty
iter->next();
continue;
}
auto findIter = historyVids_.find(dst);
if (findIter != historyVids_.end()) {
if (biDirectEdgeTypes.empty()) {
Expand All @@ -52,7 +163,7 @@ folly::Future<Status> SubgraphExecutor::execute() {
}
auto type = typeVal.getInt();
if (biDirectEdgeTypes.find(type) != biDirectEdgeTypes.end()) {
if (type < 0 || findIter->second + 2 == currentStep) {
if (type < 0 || findIter->second + 2 == currentStep_) {
iter->erase();
} else {
iter->next();
Expand All @@ -62,25 +173,30 @@ folly::Future<Status> SubgraphExecutor::execute() {
}
}
} else {
if (currentStep == steps) {
if (currentStep_ == totalSteps_) {
iter->erase();
continue;
}
if (currentVids.emplace(dst, currentStep).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
if (currentVids.emplace(dst, currentStep_).second) {
// next vids for getNeighbor
vids_.emplace_back(std::move(dst));
}
iter->next();
}
}
iter->reset();
builder.iter(std::move(iter));
ectx_->setResult(resultVar, builder.build());
finish(builder.build());
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
return finish(ResultBuilder().value(Value(std::move(ds))).build());
if (currentStep_ != 1 && subgraph_->tagFilter()) {
filterEdges(-1);
}
if (vids_.empty()) {
return false;
}
return true;
}

} // namespace graph
Expand Down
31 changes: 27 additions & 4 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

#include <robin_hood.h>

#include "graph/executor/Executor.h"
#include "graph/executor/StorageAccessExecutor.h"
#include "graph/planner/plan/Algo.h"

// Subgraph receive result from GetNeighbors
// There are two Main functions
Expand Down Expand Up @@ -39,15 +40,37 @@

namespace nebula {
namespace graph {
class SubgraphExecutor : public Executor {
using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetNeighborsResponse>;

class SubgraphExecutor : public StorageAccessExecutor {
public:
using HashMap = robin_hood::unordered_flat_map<Value, size_t, std::hash<Value>>;
using HashSet = robin_hood::unordered_flat_set<Value, std::hash<Value>>;

SubgraphExecutor(const PlanNode* node, QueryContext* qctx)
: Executor("SubgraphExecutor", node, qctx) {}
: StorageAccessExecutor("SubgraphExecutor", node, qctx) {
subgraph_ = asNode<Subgraph>(node);
}

folly::Future<Status> execute() override;

folly::Future<Status> getNeighbors();

bool process(std::unique_ptr<GetNeighborsIter> iter);

// filter out edges that do not meet the conditions in the previous step
void filterEdges(int version);

folly::Future<Status> handleResponse(RpcResponse&& resps);

private:
robin_hood::unordered_flat_map<Value, int64_t, std::hash<Value>> historyVids_;
HashMap historyVids_;
const Subgraph* subgraph_{nullptr};
size_t currentStep_{1};
size_t totalSteps_{1};
std::vector<Value> vids_;
// save vids already visited
HashSet validVids_;
};

} // namespace graph
Expand Down
Loading