Skip to content

Commit

Permalink
fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Jul 4, 2022
1 parent cf5f707 commit 114708a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 36 deletions.
24 changes: 15 additions & 9 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

#include "graph/service/GraphFlags.h"

using nebula::storage::StorageClient;
namespace nebula {
namespace graph {

folly::Future<Status> SubgraphExecutor::execute() {
SCOPED_TIMER(&execTime_);
totalSteps_ = subgraph_->steps();
startVids_ = buildRequestDataSet();
if (startVids_.rows.empty()) {
DataSet emptyResult;
Expand All @@ -21,8 +23,7 @@ folly::Future<Status> SubgraphExecutor::execute() {

DataSet SubgraphExecutor::buildRequestDataSet() {
auto inputVar = subgraph_->inputVar();
auto inputIter = ectx_->getResult(inputVar).iter();
auto iter = static_cast<SequentialIter*>(inputIter.get());
auto iter = ectx_->getResult(inputVar).iter();
return buildRequestDataSetByVidType(iter.get(), subgraph_->src(), true);
}

Expand Down Expand Up @@ -51,7 +52,7 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
-1,
currentStep_ == 1 ? subgraph_->edgeFilter() : subgraph_->filter())
.via(runner())
.thenValue([this, getNbrTime](StorageRpcResponse<GetNeighborsResponse>&& resp) mutable {
.thenValue([this, getNbrTime](RpcResponse&& resp) mutable {
// addStats(resp, getNbrTime.elapsedInUSec());
return handleResponse(std::move(resp));
});
Expand All @@ -75,14 +76,20 @@ folly::Future<Status> SubgraphExecutor::handleResponse(RpcResponse&& resps) {
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);

if (!process(iter.get()) || ++currentStep_ > totalSteps_) {
auto steps = totalSteps_;
if (!subgraph_->oneMoreStep()) {
--steps;
}
startVids_.rows.clear();

if (!process(std::move(iter)) || ++currentStep_ > steps) {
return folly::makeFuture<Status>(Status::OK());
} else {
return getNeighbors();
}
}

bool SubgraphExecutor::process(GetNeighborsIter* iter) {
bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
auto& rows = startVids_.rows;
auto gnSize = iter->size();
if (gnSize == 0) {
Expand Down Expand Up @@ -140,16 +147,15 @@ bool SubgraphExecutor::process(GetNeighborsIter* iter) {
iter->next();
}
}
if (rows.empty()) {
return false;
}

iter->reset();
builder.iter(std::move(iter));
finish(builder.build());
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
if (rows.empty()) {
return false;
}
return true;
}

Expand Down
6 changes: 3 additions & 3 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#ifndef GRAPH_EXECUTOR_ALGO_SUBGRAPHEXECUTOR_H_
#define GRAPH_EXECUTOR_ALGO_SUBGRAPHEXECUTOR_H_

#include "graph/executor/Executor.h"
#include "graph/executor/StorageAccessExecutor.h"
#include "graph/planner/plan/Algo.h"

// Subgraph receive result from GetNeighbors
Expand Down Expand Up @@ -53,7 +53,7 @@ class SubgraphExecutor : public StorageAccessExecutor {

folly::Future<Status> getNeighbors();

bool process(GetNeighborsIter* iter);
bool process(std::unique_ptr<GetNeighborsIter> iter);

folly::Future<Status> handleResponse(RpcResponse&& resps);

Expand All @@ -62,7 +62,7 @@ class SubgraphExecutor : public StorageAccessExecutor {
size_t currentStep_{1};
size_t totalSteps_{1};
DataSet startVids_;
std::unordered_map<Value, int64_t> historyVids_;
std::unordered_map<Value, size_t> historyVids_;
};

} // namespace graph
Expand Down
26 changes: 12 additions & 14 deletions src/graph/planner/ngql/SubgraphPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ StatusOr<std::unique_ptr<std::vector<VertexProp>>> SubgraphPlanner::buildVertexP

StatusOr<std::unique_ptr<std::vector<EdgeProp>>> SubgraphPlanner::buildEdgeProps() {
auto* qctx = subgraphCtx_->qctx;
bool getEdgeProp = subgraphCtx_->withProp && subgraphCtx_->getEdgeProp;
const auto& space = subgraphCtx_->space;
auto& edgeTypes = subgraphCtx_->edgeTypes;
auto& exprProps = subgraphCtx_->exprProps;
Expand All @@ -46,25 +45,25 @@ StatusOr<std::unique_ptr<std::vector<EdgeProp>>> SubgraphPlanner::buildEdgeProps
edgePropsPtr->reserve(edgeTypes.size());
for (const auto& edgeType : edgeTypes) {
EdgeProp ep;
ep.set_type(edgeType);
ep.type_ref() = edgeType;
const auto& found = edgeProps.find(std::abs(edgeType));
if (found != edgeProps.end()) {
std::set<std::string> props(found->second.begin(), found->second.end());
props.emplace(kType);
props.emplace(kRank);
props.emplace(kDst);
ep.set_props(std::vector<std::string>(props.begin(), props.end()));
ep.props_ref() = std::vector<std::string>(props.begin(), props.end());
} else {
ep.set_props({kType, kRank, kDst});
ep.props_ref() = std::vector<std::string>({kType, kRank, kDst});
}
edgePropsPtr->emplace_back(std::move(ep));
}
return edgePropsPtr;
} else {
bool getEdgeProp = subgraphCtx_->withProp && subgraphCtx_->getEdgeProp;
std::vector<EdgeType> vEdgeTypes(edgeTypes.begin(), edgeTypes.end());
return SchemaUtil::getEdgeProps(qctx, space, std::move(vEdgeTypes), getEdgeProp);
}
std::vector<EdgeType> vEdgeTypes(edgeTypes.begin(), edgeTypes.end());
auto edgeProps = SchemaUtil::getEdgeProps(qctx, space, std::move(vEdgeTypes), getEdgeProp);
NG_RETURN_IF_ERROR(edgeProps);
return edgeProps;
}

StatusOr<SubPlan> SubgraphPlanner::nSteps(SubPlan& startVidPlan, const std::string& input) {
Expand All @@ -78,25 +77,24 @@ StatusOr<SubPlan> SubgraphPlanner::nSteps(SubPlan& startVidPlan, const std::stri
auto edgeProps = buildEdgeProps();
NG_RETURN_IF_ERROR(edgeProps);

uint32_t maxSteps = steps.steps();
if (subgraphCtx_->getEdgeProp || subgraphCtx_->withProp || !dstTagProps.empty()) {
++maxSteps;
}

auto* subgraph = Subgraph::make(qctx,
startVidPlan.root,
space.id,
subgraphCtx_->from.src,
subgraphCtx_->edgeFilter,
subgraphCtx_->filter,
maxSteps);
steps.steps() + 1);
subgraph->setVertexProps(std::move(vertexProps).value());
subgraph->setEdgeProps(std::move(edgeProps).value());
subgraph->setInputVar(input);
subgraph->setBiDirectEdgeTypes(subgraphCtx_->biDirectEdgeTypes);
if (subgraphCtx_->getEdgeProp || subgraphCtx_->withProp || !dstTagProps.empty()) {
subgraph->setOneMoreStep();
}

auto* dc = DataCollect::make(qctx, DataCollect::DCKind::kSubgraph);
dc->addDep(subgraph);
dc->setInputVars({subgraph->outputVar()});
dc->setColType(std::move(subgraphCtx_->colType));
dc->setColNames(subgraphCtx_->colNames);

Expand Down
3 changes: 2 additions & 1 deletion src/graph/planner/plan/Algo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ std::unique_ptr<PlanNodeDescription> Subgraph::explain() const {
"vertexProps", vertexProps_ ? folly::toJson(util::toJson(*vertexProps_)) : "", desc.get());
addDescription(
"edgeProps", edgeProps_ ? folly::toJson(util::toJson(*edgeProps_)) : "", desc.get());
addDescription("steps", folly::toJson(util::toJson(steps_)), desc.get());
addDescription(
"steps", folly::toJson(util::toJson(oneMoreStep_ ? steps_ : steps_ - 1)), desc.get());
return desc;
}

Expand Down
27 changes: 18 additions & 9 deletions src/graph/planner/plan/Algo.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,19 +260,19 @@ class Subgraph final : public SingleInputNode {
static Subgraph* make(QueryContext* qctx,
PlanNode* input,
GraphSpaceID space,
const Expression* src,
Expression* src,
const Expression* edgeFilter,
const Expression* filter,
uint32_t steps) {
size_t steps) {
return qctx->objPool()->makeAndAdd<Subgraph>(
qctx, input, space, src, edgeFilter, filter, steps);
qctx, input, space, DCHECK_NOTNULL(src), edgeFilter, filter, steps);
}

GraphSpaceID space() const {
return space_;
}

const Expression* src() const {
Expression* src() const {
return src_;
}

Expand All @@ -284,10 +284,14 @@ class Subgraph final : public SingleInputNode {
return filter_;
}

const uint32_t steps() const {
size_t steps() const {
return steps_;
}

bool oneMoreStep() const {
return oneMoreStep_;
}

const std::unordered_set<EdgeType> biDirectEdgeTypes() const {
return biDirectEdgeTypes_;
}
Expand All @@ -300,6 +304,10 @@ class Subgraph final : public SingleInputNode {
return vertexProps_.get();
}

void setOneMoreStep() {
oneMoreStep_ = true;
}

void setBiDirectEdgeTypes(std::unordered_set<EdgeType> edgeTypes) {
biDirectEdgeTypes_ = std::move(edgeTypes);
}
Expand All @@ -319,10 +327,10 @@ class Subgraph final : public SingleInputNode {
Subgraph(QueryContext* qctx,
PlanNode* input,
GraphSpaceID space,
const Expression* src,
Expression* src,
const Expression* edgeFilter,
const Expression* filter,
uint32_t steps)
size_t steps)
: SingleInputNode(qctx, Kind::kSubgraph, input),
space_(space),
src_(src),
Expand All @@ -332,10 +340,11 @@ class Subgraph final : public SingleInputNode {

GraphSpaceID space_;
// vertices may be parsing from runtime.
const Expression* src_{nullptr};
Expression* src_{nullptr};
const Expression* edgeFilter_{nullptr};
const Expression* filter_{nullptr};
uint32_t steps_{1};
size_t steps_{1};
bool oneMoreStep_{false};
std::unordered_set<EdgeType> biDirectEdgeTypes_;
std::unique_ptr<std::vector<VertexProp>> vertexProps_;
std::unique_ptr<std::vector<EdgeProp>> edgeProps_;
Expand Down

0 comments on commit 114708a

Please sign in to comment.