Skip to content

Commit

Permalink
refactor traverse output (#5464)
Browse files Browse the repository at this point in the history
* refactor traverse output

* fix pruneproperties error & none_direct_dst

* fix test error

* fix shortest path
  • Loading branch information
nevermore3 authored Apr 4, 2023
1 parent 05d01b1 commit 6eaa621
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 70 deletions.
9 changes: 8 additions & 1 deletion src/common/function/FunctionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ FunctionManager::FunctionManager() {
// More information of encoding could be found in `NebulaKeyUtils.h`
auto &attr = functions_["none_direct_dst"];
attr.minArity_ = 1;
attr.maxArity_ = 1;
attr.maxArity_ = 2;
attr.isAlwaysPure_ = true;
attr.body_ = [](const auto &args) -> Value {
switch (args[0].get().type()) {
Expand All @@ -2035,6 +2035,13 @@ FunctionManager::FunctionManager() {
case Value::Type::LIST: {
const auto &listVal = args[0].get().getList().values;
if (listVal.empty()) {
if (args.size() == 2) {
if (args[1].get().type() == Value::Type::VERTEX) {
const auto &v = args[1].get().getVertex();
return v.vid;
}
return Value::kNullBadType;
}
return Value::kNullBadType;
}
auto &lastVal = listVal.back();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ struct EdgeInfo {
MatchEdge::Direction direction{MatchEdge::Direction::OUT_EDGE};
std::vector<std::string> types;
std::string alias;
// use for construct path struct
std::string innerAlias;
const MapExpression* props{nullptr};
Expression* filter{nullptr};
};
Expand Down
80 changes: 58 additions & 22 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace graph {

folly::Future<Status> TraverseExecutor::execute() {
range_ = traverse_->stepRange();
genPath_ = traverse_->genPath();
NG_RETURN_IF_ERROR(buildRequestVids());
if (vids_.empty()) {
DataSet emptyDs;
Expand Down Expand Up @@ -374,19 +375,25 @@ std::vector<Row> TraverseExecutor::buildZeroStepPath() {
for (auto& p : prevPaths) {
Row row = p;
List edgeList;
edgeList.values.emplace_back(vertex);
row.values.emplace_back(vertex);
row.values.emplace_back(std::move(edgeList));
row.values.emplace_back(edgeList);
if (genPath_) {
edgeList.values.emplace_back(vertex);
row.values.emplace_back(std::move(edgeList));
}
result.emplace_back(std::move(row));
}
}
} else {
for (auto& vertex : initVertices_) {
Row row;
List edgeList;
edgeList.values.emplace_back(vertex);
row.values.emplace_back(vertex);
row.values.emplace_back(std::move(edgeList));
row.values.emplace_back(edgeList);
if (genPath_) {
edgeList.values.emplace_back(vertex);
row.values.emplace_back(std::move(edgeList));
}
result.emplace_back(std::move(row));
}
}
Expand Down Expand Up @@ -477,46 +484,61 @@ std::vector<Row> TraverseExecutor::buildPath(const Value& initVertex,
return std::vector<Row>();
}

std::vector<Row> result;
result.reserve(adjEdges.size());
std::vector<Row> oneStepPath;
oneStepPath.reserve(adjEdges.size());
for (auto& edge : adjEdges) {
List edgeList;
edgeList.values.emplace_back(edge);
Row row;
row.values.emplace_back(src);
row.values.emplace_back(std::move(edgeList));
result.emplace_back(std::move(row));
// only contain edges
row.values.emplace_back(edgeList);
if (genPath_) {
// contain nodes & edges
row.values.emplace_back(std::move(edgeList));
}
oneStepPath.emplace_back(std::move(row));
}

if (maxStep == 1) {
if (traverse_->trackPrevPath()) {
return joinPrevPath(initVertex, result);
return joinPrevPath(initVertex, oneStepPath);
}
return result;
return oneStepPath;
}

size_t step = 2;
std::vector<Row> newResult;
std::queue<std::vector<Value>*> queue;
std::queue<std::vector<Value>*> edgeListQueue;
std::list<std::unique_ptr<std::vector<Value>>> holder;
for (auto& edge : adjEdges) {
auto ptr = std::make_unique<std::vector<Value>>(std::vector<Value>({edge}));
queue.emplace(ptr.get());
edgeListQueue.emplace(ptr.get());
holder.emplace_back(std::move(ptr));
}
size_t adjSize = queue.size();
while (!queue.empty()) {
auto edgeListPtr = queue.front();

size_t adjSize = edgeListQueue.size();
while (!edgeListQueue.empty()) {
auto edgeListPtr = edgeListQueue.front();
auto& dst = edgeListPtr->back().getEdge().dst;
queue.pop();
edgeListQueue.pop();

std::vector<Value>* vertexEdgeListPtr = nullptr;
if (genPath_) {
vertexEdgeListPtr = queue.front();
queue.pop();
}

--adjSize;
auto dstIter = adjList_.find(dst);
if (dstIter == adjList_.end()) {
if (adjSize == 0) {
if (++step > maxStep) {
break;
}
adjSize = queue.size();
adjSize = edgeListQueue.size();
}
continue;
}
Expand All @@ -527,29 +549,44 @@ std::vector<Row> TraverseExecutor::buildPath(const Value& initVertex,
continue;
}
auto newEdgeListPtr = std::make_unique<std::vector<Value>>(*edgeListPtr);
newEdgeListPtr->emplace_back(dstIter->first);
newEdgeListPtr->emplace_back(edge);

std::unique_ptr<std::vector<Value>> newVertexEdgeListPtr = nullptr;
if (genPath_) {
newVertexEdgeListPtr = std::make_unique<std::vector<Value>>(*vertexEdgeListPtr);
newVertexEdgeListPtr->emplace_back(dstIter->first);
newVertexEdgeListPtr->emplace_back(edge);
}

if (step >= minStep) {
Row row;
row.values.emplace_back(src);
// only contain edges
row.values.emplace_back(List(*newEdgeListPtr));
if (genPath_) {
// contain nodes & edges
row.values.emplace_back(List(*newVertexEdgeListPtr));
}
newResult.emplace_back(std::move(row));
}
queue.emplace(newEdgeListPtr.get());
edgeListQueue.emplace(newEdgeListPtr.get());
holder.emplace_back(std::move(newEdgeListPtr));
if (genPath_ && newVertexEdgeListPtr != nullptr) {
queue.emplace(newVertexEdgeListPtr.get());
holder.emplace_back(std::move(newVertexEdgeListPtr));
}
}
if (adjSize == 0) {
if (++step > maxStep) {
break;
}
adjSize = queue.size();
adjSize = edgeListQueue.size();
}
}
if (minStep <= 1) {
newResult.insert(newResult.begin(),
std::make_move_iterator(result.begin()),
std::make_move_iterator(result.end()));
std::make_move_iterator(oneStepPath.begin()),
std::make_move_iterator(oneStepPath.end()));
}
if (traverse_->trackPrevPath()) {
return joinPrevPath(initVertex, newResult);
Expand All @@ -570,8 +607,7 @@ std::vector<Row> TraverseExecutor::joinPrevPath(const Value& initVertex,
if (!hasSameEdgeInPath(prevPath, p)) {
// copy
Row row = prevPath;
row.values.emplace_back(p.values.front());
row.values.emplace_back(p.values.back());
row.values.insert(row.values.end(), p.values.begin(), p.values.end());
newPaths.emplace_back(std::move(row));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/query/TraverseExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class TraverseExecutor final : public StorageAccessExecutor {
private:
ObjectPool objPool_;

bool genPath_{false};
VidHashSet vids_;
std::vector<Value> initVertices_;
DataSet result_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ StatusOr<OptRule::TransformResult> RemoveAppendVerticesBelowJoinRule::transform(
auto& avNodeAlias = appendVertices->nodeAlias();

auto& tvEdgeAlias = traverse->edgeAlias();
auto& tvNodeAlias = traverse->nodeAlias();

auto& leftExprs = join->hashKeys();
auto& rightExprs = join->probeKeys();
Expand Down Expand Up @@ -148,6 +149,7 @@ StatusOr<OptRule::TransformResult> RemoveAppendVerticesBelowJoinRule::transform(
// and let the new left/inner join use it as right expr
auto* args = ArgumentList::make(pool);
args->addArgument(InputPropertyExpression::make(pool, tvEdgeAlias));
args->addArgument(InputPropertyExpression::make(pool, tvNodeAlias));
auto* newPrjExpr = FunctionCallExpression::make(pool, "none_direct_dst", args);

auto oldYieldColumns = project->columns()->columns();
Expand Down
22 changes: 16 additions & 6 deletions src/graph/planner/match/MatchPathPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ MatchPathPlanner::MatchPathPlanner(CypherClauseContextBase* ctx, const Path& pat
static std::vector<std::string> genTraverseColNames(const std::vector<std::string>& inputCols,
const NodeInfo& node,
const EdgeInfo& edge,
bool trackPrev) {
bool trackPrev,
bool genPath = false) {
std::vector<std::string> cols;
if (trackPrev) {
cols = inputCols;
}
cols.emplace_back(node.alias);
cols.emplace_back(edge.alias);
if (genPath) {
cols.emplace_back(edge.innerAlias);
}
return cols;
}

Expand All @@ -47,9 +51,12 @@ static std::vector<std::string> genAppendVColNames(const std::vector<std::string
return cols;
}

static Expression* genNextTraverseStart(ObjectPool* pool, const EdgeInfo& edge) {
static Expression* genNextTraverseStart(ObjectPool* pool,
const EdgeInfo& edge,
const NodeInfo& node) {
auto args = ArgumentList::make(pool);
args->addArgument(InputPropertyExpression::make(pool, edge.alias));
args->addArgument(InputPropertyExpression::make(pool, node.alias));
return FunctionCallExpression::make(pool, "none_direct_dst", args);
}

Expand Down Expand Up @@ -218,13 +225,15 @@ Status MatchPathPlanner::leftExpandFromNode(size_t startIndex, SubPlan& subplan)
traverse->setEdgeDirection(edge.direction);
traverse->setStepRange(stepRange);
traverse->setDedup();
traverse->setGenPath(path_.genPath);
// If start from end of the path pattern, the first traverse would not
// track the previous path, otherwise, it should.
bool trackPrevPath = (startIndex + 1 == nodeInfos.size() ? i != startIndex : true);
traverse->setTrackPrevPath(trackPrevPath);
traverse->setColNames(genTraverseColNames(subplan.root->colNames(), node, edge, trackPrevPath));
traverse->setColNames(
genTraverseColNames(subplan.root->colNames(), node, edge, trackPrevPath, path_.genPath));
subplan.root = traverse;
nextTraverseStart = genNextTraverseStart(qctx->objPool(), edge);
nextTraverseStart = genNextTraverseStart(qctx->objPool(), edge, node);
if (expandInto) {
// TODO(shylock) optimize to embed filter to Traverse
auto* startVid = nodeId(qctx->objPool(), dst);
Expand Down Expand Up @@ -290,10 +299,11 @@ Status MatchPathPlanner::rightExpandFromNode(size_t startIndex, SubPlan& subplan
traverse->setStepRange(stepRange);
traverse->setDedup();
traverse->setTrackPrevPath(i != startIndex);
traverse->setGenPath(path_.genPath);
traverse->setColNames(
genTraverseColNames(subplan.root->colNames(), node, edge, i != startIndex));
genTraverseColNames(subplan.root->colNames(), node, edge, i != startIndex, path_.genPath));
subplan.root = traverse;
nextTraverseStart = genNextTraverseStart(qctx->objPool(), edge);
nextTraverseStart = genNextTraverseStart(qctx->objPool(), edge, node);
if (expandInto) {
auto* startVid = nodeId(qctx->objPool(), dst);
auto* endVid = nextTraverseStart;
Expand Down
27 changes: 15 additions & 12 deletions src/graph/planner/match/MatchSolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,17 @@ static YieldColumn* buildVertexColumn(ObjectPool* pool, const std::string& alias
return new YieldColumn(InputPropertyExpression::make(pool, alias), alias);
}

static YieldColumn* buildEdgeColumn(QueryContext* qctx, const EdgeInfo& edge) {
static YieldColumn* buildEdgeColumn(QueryContext* qctx, const EdgeInfo& edge, bool genPath) {
Expression* expr = nullptr;
const std::string& edgeName = genPath ? edge.innerAlias : edge.alias;
auto pool = qctx->objPool();
if (edge.range == nullptr) {
expr = SubscriptExpression::make(
pool, InputPropertyExpression::make(pool, edge.alias), ConstantExpression::make(pool, 0));
pool, InputPropertyExpression::make(pool, edgeName), ConstantExpression::make(pool, 0));
} else {
auto innerVar = qctx->vctx()->anonVarGen()->getVar();
auto* args = ArgumentList::make(pool);
args->addArgument(VariableExpression::makeInner(pool, innerVar));
auto* filter = FunctionCallExpression::make(pool, "is_edge", args);
expr = ListComprehensionExpression::make(
pool, innerVar, InputPropertyExpression::make(pool, edge.alias), filter);
expr = InputPropertyExpression::make(pool, edgeName);
}
return new YieldColumn(expr, edge.alias);
return new YieldColumn(expr, edgeName);
}

// static
Expand All @@ -262,16 +258,23 @@ void MatchSolver::buildProjectColumns(QueryContext* qctx, const Path& path, SubP
}
};

auto addEdge = [columns, &colNames, qctx](auto& edgeInfo) {
auto addEdge = [columns, &colNames, qctx](auto& edgeInfo, bool genPath = false) {
if (!edgeInfo.alias.empty() && !edgeInfo.anonymous) {
columns->addColumn(buildEdgeColumn(qctx, edgeInfo));
colNames.emplace_back(edgeInfo.alias);
columns->addColumn(buildEdgeColumn(qctx, edgeInfo, genPath));
if (genPath) {
colNames.emplace_back(edgeInfo.innerAlias);
} else {
colNames.emplace_back(edgeInfo.alias);
}
}
};

for (size_t i = 0; i < edgeInfos.size(); i++) {
addNode(nodeInfos[i]);
addEdge(edgeInfos[i]);
if (path.genPath) {
addEdge(edgeInfos[i], true);
}
}

// last vertex
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ void Traverse::cloneMembers(const Traverse& g) {
if (g.tagFilter_ != nullptr) {
setTagFilter(g.tagFilter_->clone());
}
genPath_ = g.genPath();
}

std::unique_ptr<PlanNodeDescription> Traverse::explain() const {
Expand Down
9 changes: 9 additions & 0 deletions src/graph/planner/plan/Query.h
Original file line number Diff line number Diff line change
Expand Up @@ -1713,10 +1713,18 @@ class Traverse final : public GetNeighbors {
firstStepFilter_ = filter;
}

void setGenPath(bool genPath) {
genPath_ = genPath;
}

Expression* tagFilter() const {
return tagFilter_;
}

bool genPath() const {
return genPath_;
}

void setTagFilter(Expression* tagFilter) {
tagFilter_ = tagFilter;
}
Expand All @@ -1738,6 +1746,7 @@ class Traverse final : public GetNeighbors {
// Push down filter in first step
Expression* firstStepFilter_{nullptr};
Expression* tagFilter_{nullptr};
bool genPath_{false};
};

// Append vertices to a path.
Expand Down
Loading

0 comments on commit 6eaa621

Please sign in to comment.