Skip to content

Commit

Permalink
fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Feb 15, 2022
1 parent f5412d3 commit 2645327
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 60 deletions.
67 changes: 25 additions & 42 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ folly::Future<Status> SubgraphExecutor::execute() {
DCHECK(currentStepVal.isInt());
auto currentStep = currentStepVal.getInt();
VLOG(1) << "Current Step is: " << currentStep << " Total Steps is: " << steps;

if (currentStep == steps) {
oneMoreStep();
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}
auto resultVar = subgraph->resultVar();

VLOG(1) << "input: " << subgraph->inputVar() << " output: " << node()->outputVar();
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
std::unordered_set<Value> currentVids;

ResultBuilder builder;
builder.value(iter->valuePtr());

std::unordered_map<Value, int64_t> currentVids;
currentVids.reserve(iter->size());
historyVids_.reserve(historyVids_.size() + iter->size());
if (currentStep == 1) {
for (; iter->valid(); iter->next()) {
const auto& src = iter->getColumn(nebula::kVid);
currentVids.emplace(src);
currentVids.emplace(src, 0);
}
iter->reset();
}
Expand All @@ -46,15 +46,28 @@ folly::Future<Status> SubgraphExecutor::execute() {
if (biDirectEdgeTypes.empty()) {
iter->next();
} else {
const auto& type = iter->getEdgeProp("*", nebula::kType);
if (type.isInt() && biDirectEdgeTypes.find(type.getInt()) != biDirectEdgeTypes.end()) {
const auto& typeVal = iter->getEdgeProp("*", nebula::kType);
if (UNLIKELY(!typeVal.isInt())) {
iter->erase();
continue;
}
auto type = typeVal.getInt();
if (biDirectEdgeTypes.find(type) != biDirectEdgeTypes.end()) {
if (type < 0 || historyVids_[dst] + 2 == currentStep) {
iter->erase();
} else {
iter->next();
}
} else {
iter->next();
}
}
} else {
if (currentVids.emplace(dst).second) {
if (currentStep == steps) {
iter->erase();
continue;
}
if (currentVids.emplace(dst, currentStep).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
Expand All @@ -63,44 +76,14 @@ folly::Future<Status> SubgraphExecutor::execute() {
}
}
iter->reset();
builder.iter(std::move(iter));
ectx_->setResult(resultVar, builder.build());
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
VLOG(1) << "Next step vid is : " << ds;
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}

void SubgraphExecutor::oneMoreStep() {
auto* subgraph = asNode<Subgraph>(node());
auto output = subgraph->oneMoreStepOutput();
VLOG(1) << "OneMoreStep Input: " << subgraph->inputVar() << " Output: " << output;
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
DCHECK(iter && iter->isGetNeighborsIter());
auto& biDirectEdgeTypes = subgraph->biDirectEdgeTypes();

ResultBuilder builder;
builder.value(iter->valuePtr());
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (historyVids_.find(dst) != historyVids_.end()) {
if (biDirectEdgeTypes.empty()) {
iter->next();
} else {
const auto& type = iter->getEdgeProp("*", nebula::kType);
if (type.isInt() && biDirectEdgeTypes.find(type.getInt()) != biDirectEdgeTypes.end()) {
iter->erase();
} else {
iter->next();
}
}
} else {
iter->erase();
}
}
iter->reset();
builder.iter(std::move(iter));
ectx_->setResult(output, builder.build());
}

} // namespace graph
} // namespace nebula
5 changes: 1 addition & 4 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ class SubgraphExecutor : public Executor {
folly::Future<Status> execute() override;

private:
void oneMoreStep();

private:
std::unordered_set<Value> historyVids_;
std::unordered_map<Value, int64_t> historyVids_;
};

} // namespace graph
Expand Down
3 changes: 0 additions & 3 deletions src/graph/executor/query/DataCollectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ Status DataCollectExecutor::collectSubgraph(const std::vector<std::string>& vars
for (auto i = vars.begin(); i != vars.end(); ++i) {
const auto& hist = ectx_->getHistory(*i);
for (auto j = hist.begin(); j != hist.end(); ++j) {
if (i == vars.begin() && j == hist.end() - 1) {
continue;
}
auto iter = (*j).iter();
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
List vertices;
Expand Down
6 changes: 3 additions & 3 deletions src/graph/planner/ngql/SubgraphPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ StatusOr<SubPlan> SubgraphPlanner::nSteps(SubPlan& startVidPlan, const std::stri
gn->setEdgeProps(std::move(edgeProps).value());
gn->setInputVar(input);

auto oneMoreStepOutput = qctx->vctx()->anonVarGen()->getVar();
auto resultVar = qctx->vctx()->anonVarGen()->getVar();
auto loopSteps = qctx->vctx()->anonVarGen()->getVar();
subgraphCtx_->loopSteps = loopSteps;
auto* subgraph = Subgraph::make(qctx, gn, oneMoreStepOutput, loopSteps, steps.steps() + 1);
auto* subgraph = Subgraph::make(qctx, gn, resultVar, loopSteps, steps.steps() + 1);
subgraph->setOutputVar(input);
subgraph->setBiDirectEdgeTypes(subgraphCtx_->biDirectEdgeTypes);
subgraph->setColNames({nebula::kVid});
Expand All @@ -81,7 +81,7 @@ StatusOr<SubPlan> SubgraphPlanner::nSteps(SubPlan& startVidPlan, const std::stri

auto* dc = DataCollect::make(qctx, DataCollect::DCKind::kSubgraph);
dc->addDep(loop);
dc->setInputVars({gn->outputVar(), oneMoreStepOutput});
dc->setInputVars({resultVar});
dc->setColNames({"VERTICES", "EDGES"});

auto* project = Project::make(qctx, dc, subgraphCtx_->yieldExpr);
Expand Down
15 changes: 7 additions & 8 deletions src/graph/planner/plan/Algo.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,14 @@ class Subgraph final : public SingleInputNode {
public:
static Subgraph* make(QueryContext* qctx,
PlanNode* input,
const std::string& oneMoreStepOutput,
const std::string& resultVar,
const std::string& currentStepVar,
uint32_t steps) {
return qctx->objPool()->add(
new Subgraph(qctx, input, oneMoreStepOutput, currentStepVar, steps));
return qctx->objPool()->add(new Subgraph(qctx, input, resultVar, currentStepVar, steps));
}

const std::string& oneMoreStepOutput() const {
return oneMoreStepOutput_;
const std::string& resultVar() const {
return resultVar_;
}

const std::string& currentStepVar() const {
Expand All @@ -164,15 +163,15 @@ class Subgraph final : public SingleInputNode {
private:
Subgraph(QueryContext* qctx,
PlanNode* input,
const std::string& oneMoreStepOutput,
const std::string& resultVar,
const std::string& currentStepVar,
uint32_t steps)
: SingleInputNode(qctx, Kind::kSubgraph, input),
oneMoreStepOutput_(oneMoreStepOutput),
resultVar_(resultVar),
currentStepVar_(currentStepVar),
steps_(steps) {}

std::string oneMoreStepOutput_;
std::string resultVar_;
std::string currentStepVar_;
uint32_t steps_;
std::unordered_set<EdgeType> biDirectEdgeTypes_;
Expand Down

0 comments on commit 2645327

Please sign in to comment.