Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize-all-path-go #5528

Merged
merged 5 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ struct GoContext final : AstContext {
bool joinInput{false};
// true when $$.tag.prop exist
bool joinDst{false};
bool isSimple{false};

ExpressionProps exprProps;

Expand Down
143 changes: 95 additions & 48 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ DEFINE_uint32(
100,
"the number of vids to expand, when this threshold is exceeded, use heuristic expansion");
DEFINE_uint32(path_threshold_ratio, 2, "threshold for heuristics expansion");
DEFINE_uint32(path_batch_size, 5000, "number of paths constructed by each thread");
DEFINE_uint32(path_batch_size, 50000, "number of paths constructed by each thread");

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -156,13 +156,16 @@ folly::Future<Status> AllPathsExecutor::getNeighbors(bool reverse) {
}
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);
time::Duration buildAdjTime;
auto key = folly::sformat("buildAdjTime {}step[{}]", reverse ? "reverse " : "", step);
if (reverse) {
rightNextStepVids_.clear();
expandFromRight(iter.get());
} else {
leftNextStepVids_.clear();
expandFromLeft(iter.get());
}
otherStats_.emplace(key, folly::sformat("{}(us)", buildAdjTime.elapsedInUSec()));
return Status::OK();
});
}
Expand Down Expand Up @@ -261,6 +264,7 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
// if key exists, discard the right adjacency's key & values
// because the right adjacency list may have fewer edges
// a->c->o, a->b, c->f, f->o
time::Duration mergeAdjTime;
for (auto& rAdj : rightAdjList_) {
auto& src = rAdj.first;
auto iter = leftAdjList_.find(src);
Expand Down Expand Up @@ -292,71 +296,106 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
}
}
}
otherStats_.emplace("merge_adj_time", folly::sformat("{}(us)", mergeAdjTime.elapsedInUSec()));
time::Duration buildPathTime;
auto future = buildPathMultiJobs();
return future.via(runner()).thenValue([this](auto&& resp) {
UNUSED(resp);
if (!withProp_ || emptyPropVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
}
return getPathProps();
});
return future.via(runner())
.ensure([this, buildPathTime]() {
otherStats_.emplace("build_path_time",
folly::sformat("{}(us)", buildPathTime.elapsedInUSec()));
})
.thenValue([this](auto&& resp) {
UNUSED(resp);
if (!withProp_ || emptyPropVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
}
return getPathProps();
});
}

folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
auto pathsPtr = std::make_shared<std::vector<std::vector<Value>>>();
auto pathsPtr = std::make_shared<std::vector<NPath*>>();
if (threadLocalPtr_.get() == nullptr) {
threadLocalPtr_.reset(new std::deque<NPath>());
}
for (auto& vid : leftInitVids_) {
auto vidIter = leftAdjList_.find(vid);
if (vidIter == leftAdjList_.end()) {
continue;
}
auto src = vidIter->first;
auto& src = vidIter->first;
auto& adjEdges = vidIter->second;
if (adjEdges.empty()) {
continue;
}
pathsPtr->reserve(adjEdges.size() + pathsPtr->size());
for (auto& edge : adjEdges) {
pathsPtr->emplace_back(std::vector<Value>({src, edge}));
threadLocalPtr_->emplace_back(NPath(src, edge));
pathsPtr->emplace_back(&threadLocalPtr_->back());
}
}
size_t step = 2;
auto future = doBuildPath(step, 0, pathsPtr->size(), pathsPtr);
return future.via(runner()).thenValue([this](std::vector<Row>&& paths) {
return future.via(runner()).thenValue([this](std::vector<std::pair<NPath*, Value>>&& paths) {
memory::MemoryCheckGuard guard;

if (!paths.empty()) {
result_.rows.swap(paths);
time::Duration convertPathTime;
for (auto& path : paths) {
result_.rows.emplace_back(convertNPath2Row(path.first, path.second));
}
otherStats_.emplace("convert_path_time",
folly::sformat("{}(us)", convertPathTime.elapsedInUSec()));
}
return Status::OK();
});
}

folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<std::vector<Value>>> pathsPtr) {
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
return folly::makeFuture<std::vector<Row>>(std::vector<Row>());
// construct ROW[src1, [e1, v2, e2], v3]
Row AllPathsExecutor::convertNPath2Row(NPath* path, Value dst) {
std::vector<Value> list;
NPath* head = path;
while (head != nullptr) {
list.emplace_back(head->edge);
list.emplace_back(head->vertex);
head = head->p;
}
Row row;
// add src;
row.values.emplace_back(list.back());
list.pop_back();
std::reverse(list.begin(), list.end());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need to reverse the list?

Copy link
Contributor Author

@nevermore3 nevermore3 Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when recursively parsing vertex and edge through NPath, the order is reversed compared to the topology of the path

List edgeList(std::move(list));
row.values.emplace_back(std::move(edgeList));
row.values.emplace_back(std::move(dst));
return row;
}

folly::Future<std::vector<std::pair<AllPathsExecutor::NPath*, Value>>>
AllPathsExecutor::doBuildPath(size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<NPath*>> pathsPtr) {
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
return folly::makeFuture<std::vector<std::pair<NPath*, Value>>>(
std::vector<std::pair<NPath*, Value>>());
}
if (threadLocalPtr_.get() == nullptr) {
threadLocalPtr_.reset(new std::deque<NPath>());
}
auto& adjList = leftAdjList_;
auto currentPathPtr = std::make_unique<std::vector<Row>>();
auto newPathsPtr = std::make_shared<std::vector<std::vector<Value>>>();
auto currentStepResult = std::make_unique<std::vector<std::pair<NPath*, Value>>>();
auto newPathsPtr = std::make_shared<std::vector<NPath*>>();

for (auto i = start; i < end; ++i) {
auto& path = (*pathsPtr)[i];
auto& edgeValue = path.back();
auto path = (*pathsPtr)[i];
auto& edgeValue = path->edge;
DCHECK(edgeValue.isEdge());
auto& dst = edgeValue.getEdge().dst;
auto dstIter = rightInitVids_.find(dst);
if (dstIter != rightInitVids_.end()) {
Row row;
row.values.emplace_back(path.front());
List edgeList(std::vector<Value>(path.begin() + 1, path.end()));
row.values.emplace_back(std::move(edgeList));
row.values.emplace_back(*dstIter);
currentPathPtr->emplace_back(std::move(row));
currentStepResult->emplace_back(std::make_pair(path, *dstIter));
++cnt_;
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
break;
Expand All @@ -379,20 +418,17 @@ folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
continue;
}
}
// copy
auto newPath = path;
newPath.emplace_back(adjIter->first);
newPath.emplace_back(edge);
newPathsPtr->emplace_back(std::move(newPath));
threadLocalPtr_->emplace_back(NPath(path, adjIter->first, edge));
newPathsPtr->emplace_back(&threadLocalPtr_->back());
}
}
}

auto newPathsSize = newPathsPtr->size();
if (step > maxStep_ || newPathsSize == 0) {
return folly::makeFuture<std::vector<Row>>(std::move(*currentPathPtr));
return folly::makeFuture<std::vector<std::pair<NPath*, Value>>>(std::move(*currentStepResult));
}
std::vector<folly::Future<std::vector<Row>>> futures;
std::vector<folly::Future<std::vector<std::pair<NPath*, Value>>>> futures;
if (newPathsSize < FLAGS_path_batch_size) {
futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr]() {
return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr);
Expand All @@ -407,9 +443,10 @@ folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
}
}
return folly::collect(futures).via(runner()).thenValue(
[pathPtr = std::move(currentPathPtr)](std::vector<std::vector<Row>>&& paths) {
[pathPtr = std::move(currentStepResult)](
std::vector<std::vector<std::pair<NPath*, Value>>>&& paths) {
memory::MemoryCheckGuard guard;
std::vector<Row> result = std::move(*pathPtr);
std::vector<std::pair<NPath*, Value>> result = std::move(*pathPtr);
for (auto& path : paths) {
if (path.empty()) {
continue;
Expand Down Expand Up @@ -441,19 +478,29 @@ folly::Future<Status> AllPathsExecutor::getPathProps() {
});
}

bool AllPathsExecutor::hasSameVertices(const std::vector<Value>& edgeList, const Edge& edge) {
bool AllPathsExecutor::hasSameEdge(NPath* path, const Edge& edge) {
NPath* head = path;
while (head != nullptr) {
if (edge == head->edge) {
return true;
}
head = head->p;
}
return false;
}

bool AllPathsExecutor::hasSameVertices(NPath* path, const Edge& edge) {
if (edge.src == edge.dst) {
return true;
}
auto& vid = edge.dst;
auto iter = edgeList.begin() + 1;
for (; iter != edgeList.end(); iter++) {
if (iter->isEdge()) {
auto& edgeVal = iter->getEdge();
if (edgeVal.src == vid) {
return true;
}
NPath* head = path;
while (head != nullptr) {
auto& vertex = head->vertex;
if (vertex.getVertex().vid == vid) {
return true;
}
head = head->p;
}
return false;
}
Expand Down
29 changes: 22 additions & 7 deletions src/graph/executor/algo/AllPathsExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#ifndef GRAPH_EXECUTOR_ALGO_ALLPATHSEXECUTOR_H_
#define GRAPH_EXECUTOR_ALGO_ALLPATHSEXECUTOR_H_

#include "folly/ThreadLocal.h"
#include "graph/executor/StorageAccessExecutor.h"

// Using the two-way BFS algorithm, a heuristic algorithm is used in the expansion process
Expand All @@ -26,10 +27,10 @@
// when expanding, if the vid has already been visited, do not visit again
// leftAdjList_ save result of forward expansion
// rightAdjList_ save result of backward expansion

namespace nebula {
namespace graph {
class AllPaths;
struct NPath;
class AllPathsExecutor final : public StorageAccessExecutor {
public:
AllPathsExecutor(const PlanNode* node, QueryContext* qctx)
Expand All @@ -46,6 +47,17 @@ class AllPathsExecutor final : public StorageAccessExecutor {
template <typename T = Value>
using VertexMap = std::unordered_map<Value, std::vector<T>, VertexHash, VertexEqual>;

struct NPath {
NPath* p{nullptr};
const Value& vertex;
const Value& edge;
NPath(const Value& v, const Value& e) : vertex(v), edge(e) {}
NPath(NPath* path, const Value& v, const Value& e) : p(path), vertex(v), edge(e) {}
NPath(NPath&& v) noexcept : p(v.p), vertex(std::move(v.vertex)), edge(std::move(v.edge)) {}
NPath(const NPath& v) : p(v.p), vertex(v.vertex), edge(v.edge) {}
~NPath() {}
};

private:
void buildRequestVids(bool reverse);

Expand All @@ -59,19 +71,20 @@ class AllPathsExecutor final : public StorageAccessExecutor {

void expandFromRight(GetNeighborsIter* iter);

folly::Future<std::vector<Row>> doBuildPath(
size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<std::vector<Value>>> edgeLists);
Row convertNPath2Row(NPath* path, Value dst);

folly::Future<std::vector<std::pair<NPath*, Value>>> doBuildPath(
size_t step, size_t start, size_t end, std::shared_ptr<std::vector<NPath*>> paths);

folly::Future<Status> getPathProps();

folly::Future<Status> buildPathMultiJobs();

folly::Future<Status> buildResult();

bool hasSameVertices(const std::vector<Value>& edgeList, const Edge& edge);
bool hasSameEdge(NPath* path, const Edge& edge);

bool hasSameVertices(NPath* path, const Edge& edge);

private:
const AllPaths* pathNode_{nullptr};
Expand All @@ -94,6 +107,8 @@ class AllPathsExecutor final : public StorageAccessExecutor {

DataSet result_;
std::vector<Value> emptyPropVids_;
class NewTag {};
folly::ThreadLocalPtr<std::deque<NPath>, NewTag> threadLocalPtr_;
};
} // namespace graph
} // namespace nebula
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/ExpandExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ folly::Future<Status> ExpandExecutor::GetDstBySrc() {
size = (*result.dsts_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
otherStats_.emplace(folly::sformat("step{} resp [{}]", currentStep_, i),
folly::toPrettyJson(info));
}
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
Expand Down
30 changes: 30 additions & 0 deletions src/graph/planner/ngql/GoPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,33 @@ PlanNode* GoPlanner::buildJoinDstPlan(PlanNode* dep) {
return join;
}

SubPlan GoPlanner::doSimplePlan() {
auto qctx = goCtx_->qctx;
size_t step = goCtx_->steps.mSteps();
auto* expand = Expand::make(qctx,
startNode_,
goCtx_->space.id,
false, // random
step,
buildEdgeProps(true));
expand->setEdgeTypes(buildEdgeTypes());
expand->setColNames({"_expand_vid"});
expand->setInputVar(goCtx_->vidsVar);

auto* dedup = Dedup::make(qctx, expand);

auto pool = qctx->objPool();
auto* newYieldExpr = pool->makeAndAdd<YieldColumns>();
newYieldExpr->addColumn(new YieldColumn(ColumnExpression::make(pool, 0)));
auto* project = Project::make(qctx, dedup, newYieldExpr);
project->setColNames(std::move(goCtx_->colNames));

SubPlan subPlan;
subPlan.root = project;
subPlan.tail = expand;
return subPlan;
}

SubPlan GoPlanner::doPlan() {
auto qctx = goCtx_->qctx;
auto& from = goCtx_->from;
Expand Down Expand Up @@ -259,6 +286,9 @@ StatusOr<SubPlan> GoPlanner::transform(AstContext* astCtx) {
subPlan.root = subPlan.tail = pt;
return subPlan;
}
if (goCtx_->isSimple) {
return doSimplePlan();
}
return doPlan();
}

Expand Down
2 changes: 2 additions & 0 deletions src/graph/planner/ngql/GoPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class GoPlanner final : public Planner {
private:
SubPlan doPlan();

SubPlan doSimplePlan();

private:
std::unique_ptr<VertexProps> buildVertexProps(const ExpressionProps::TagIDPropsMap& propsMap);

Expand Down
Loading