Skip to content

Commit

Permalink
Cherry pick 3.5 (0420-0427) (#5530)
Browse files Browse the repository at this point in the history
* fixed yapf version  (#5513)

* Update requirements.txt

fixed yapf version to the last version supporting tomli 1.x

* Update requirements.txt

fix typo

* update unsupported action syntax (#5527)

update unsupported syntax

* optimize-all-path-go (#5528)

---------

Co-authored-by: George <[email protected]>
Co-authored-by: jimingquan <[email protected]>
  • Loading branch information
3 people authored Apr 27, 2023
1 parent b541615 commit dc747b3
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 129 deletions.
6 changes: 3 additions & 3 deletions .github/actions/tagname-action/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ runs:
tag=$(echo ${{ github.ref }} | rev | cut -d/ -f1 | rev)
tagnum=$(echo $tag | sed "s/^v//")
majorver=$(echo $tag | cut -d "." -f 1)
echo "::set-output name=tag::$tag"
echo "::set-output name=tagnum::$tagnum"
echo "::set-output name=majorver::$majorver"
echo "tag=$tag" >> $GITHUB_OUTPUT
echo "tagnum=$tagnum" >> $GITHUB_OUTPUT
echo "majorver=$majorver" >> $GITHUB_OUTPUT
shell: bash
1 change: 1 addition & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ struct GoContext final : AstContext {
bool joinInput{false};
// true when $$.tag.prop exist
bool joinDst{false};
bool isSimple{false};

ExpressionProps exprProps;

Expand Down
143 changes: 95 additions & 48 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ DEFINE_uint32(
100,
"the number of vids to expand, when this threshold is exceeded, use heuristic expansion");
DEFINE_uint32(path_threshold_ratio, 2, "threshold for heuristics expansion");
DEFINE_uint32(path_batch_size, 5000, "number of paths constructed by each thread");
DEFINE_uint32(path_batch_size, 50000, "number of paths constructed by each thread");

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -156,13 +156,16 @@ folly::Future<Status> AllPathsExecutor::getNeighbors(bool reverse) {
}
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);
time::Duration buildAdjTime;
auto key = folly::sformat("buildAdjTime {}step[{}]", reverse ? "reverse " : "", step);
if (reverse) {
rightNextStepVids_.clear();
expandFromRight(iter.get());
} else {
leftNextStepVids_.clear();
expandFromLeft(iter.get());
}
otherStats_.emplace(key, folly::sformat("{}(us)", buildAdjTime.elapsedInUSec()));
return Status::OK();
});
}
Expand Down Expand Up @@ -261,6 +264,7 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
// if key exists, discard the right adjacency's key & values
// because the right adjacency list may have fewer edges
// a->c->o, a->b, c->f, f->o
time::Duration mergeAdjTime;
for (auto& rAdj : rightAdjList_) {
auto& src = rAdj.first;
auto iter = leftAdjList_.find(src);
Expand Down Expand Up @@ -292,71 +296,106 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
}
}
}
otherStats_.emplace("merge_adj_time", folly::sformat("{}(us)", mergeAdjTime.elapsedInUSec()));
time::Duration buildPathTime;
auto future = buildPathMultiJobs();
return future.via(runner()).thenValue([this](auto&& resp) {
UNUSED(resp);
if (!withProp_ || emptyPropVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
}
return getPathProps();
});
return future.via(runner())
.ensure([this, buildPathTime]() {
otherStats_.emplace("build_path_time",
folly::sformat("{}(us)", buildPathTime.elapsedInUSec()));
})
.thenValue([this](auto&& resp) {
UNUSED(resp);
if (!withProp_ || emptyPropVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
}
return getPathProps();
});
}

folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
auto pathsPtr = std::make_shared<std::vector<std::vector<Value>>>();
auto pathsPtr = std::make_shared<std::vector<NPath*>>();
if (threadLocalPtr_.get() == nullptr) {
threadLocalPtr_.reset(new std::deque<NPath>());
}
for (auto& vid : leftInitVids_) {
auto vidIter = leftAdjList_.find(vid);
if (vidIter == leftAdjList_.end()) {
continue;
}
auto src = vidIter->first;
auto& src = vidIter->first;
auto& adjEdges = vidIter->second;
if (adjEdges.empty()) {
continue;
}
pathsPtr->reserve(adjEdges.size() + pathsPtr->size());
for (auto& edge : adjEdges) {
pathsPtr->emplace_back(std::vector<Value>({src, edge}));
threadLocalPtr_->emplace_back(NPath(src, edge));
pathsPtr->emplace_back(&threadLocalPtr_->back());
}
}
size_t step = 2;
auto future = doBuildPath(step, 0, pathsPtr->size(), pathsPtr);
return future.via(runner()).thenValue([this](std::vector<Row>&& paths) {
return future.via(runner()).thenValue([this](std::vector<std::pair<NPath*, Value>>&& paths) {
memory::MemoryCheckGuard guard;

if (!paths.empty()) {
result_.rows.swap(paths);
time::Duration convertPathTime;
for (auto& path : paths) {
result_.rows.emplace_back(convertNPath2Row(path.first, path.second));
}
otherStats_.emplace("convert_path_time",
folly::sformat("{}(us)", convertPathTime.elapsedInUSec()));
}
return Status::OK();
});
}

folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<std::vector<Value>>> pathsPtr) {
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
return folly::makeFuture<std::vector<Row>>(std::vector<Row>());
// construct ROW[src1, [e1, v2, e2], v3]
Row AllPathsExecutor::convertNPath2Row(NPath* path, Value dst) {
std::vector<Value> list;
NPath* head = path;
while (head != nullptr) {
list.emplace_back(head->edge);
list.emplace_back(head->vertex);
head = head->p;
}
Row row;
// add src;
row.values.emplace_back(list.back());
list.pop_back();
std::reverse(list.begin(), list.end());
List edgeList(std::move(list));
row.values.emplace_back(std::move(edgeList));
row.values.emplace_back(std::move(dst));
return row;
}

folly::Future<std::vector<std::pair<AllPathsExecutor::NPath*, Value>>>
AllPathsExecutor::doBuildPath(size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<NPath*>> pathsPtr) {
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
return folly::makeFuture<std::vector<std::pair<NPath*, Value>>>(
std::vector<std::pair<NPath*, Value>>());
}
if (threadLocalPtr_.get() == nullptr) {
threadLocalPtr_.reset(new std::deque<NPath>());
}
auto& adjList = leftAdjList_;
auto currentPathPtr = std::make_unique<std::vector<Row>>();
auto newPathsPtr = std::make_shared<std::vector<std::vector<Value>>>();
auto currentStepResult = std::make_unique<std::vector<std::pair<NPath*, Value>>>();
auto newPathsPtr = std::make_shared<std::vector<NPath*>>();

for (auto i = start; i < end; ++i) {
auto& path = (*pathsPtr)[i];
auto& edgeValue = path.back();
auto path = (*pathsPtr)[i];
auto& edgeValue = path->edge;
DCHECK(edgeValue.isEdge());
auto& dst = edgeValue.getEdge().dst;
auto dstIter = rightInitVids_.find(dst);
if (dstIter != rightInitVids_.end()) {
Row row;
row.values.emplace_back(path.front());
List edgeList(std::vector<Value>(path.begin() + 1, path.end()));
row.values.emplace_back(std::move(edgeList));
row.values.emplace_back(*dstIter);
currentPathPtr->emplace_back(std::move(row));
currentStepResult->emplace_back(std::make_pair(path, *dstIter));
++cnt_;
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
break;
Expand All @@ -379,20 +418,17 @@ folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
continue;
}
}
// copy
auto newPath = path;
newPath.emplace_back(adjIter->first);
newPath.emplace_back(edge);
newPathsPtr->emplace_back(std::move(newPath));
threadLocalPtr_->emplace_back(NPath(path, adjIter->first, edge));
newPathsPtr->emplace_back(&threadLocalPtr_->back());
}
}
}

auto newPathsSize = newPathsPtr->size();
if (step > maxStep_ || newPathsSize == 0) {
return folly::makeFuture<std::vector<Row>>(std::move(*currentPathPtr));
return folly::makeFuture<std::vector<std::pair<NPath*, Value>>>(std::move(*currentStepResult));
}
std::vector<folly::Future<std::vector<Row>>> futures;
std::vector<folly::Future<std::vector<std::pair<NPath*, Value>>>> futures;
if (newPathsSize < FLAGS_path_batch_size) {
futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr]() {
return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr);
Expand All @@ -407,9 +443,10 @@ folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
}
}
return folly::collect(futures).via(runner()).thenValue(
[pathPtr = std::move(currentPathPtr)](std::vector<std::vector<Row>>&& paths) {
[pathPtr = std::move(currentStepResult)](
std::vector<std::vector<std::pair<NPath*, Value>>>&& paths) {
memory::MemoryCheckGuard guard;
std::vector<Row> result = std::move(*pathPtr);
std::vector<std::pair<NPath*, Value>> result = std::move(*pathPtr);
for (auto& path : paths) {
if (path.empty()) {
continue;
Expand Down Expand Up @@ -441,19 +478,29 @@ folly::Future<Status> AllPathsExecutor::getPathProps() {
});
}

bool AllPathsExecutor::hasSameVertices(const std::vector<Value>& edgeList, const Edge& edge) {
bool AllPathsExecutor::hasSameEdge(NPath* path, const Edge& edge) {
NPath* head = path;
while (head != nullptr) {
if (edge == head->edge) {
return true;
}
head = head->p;
}
return false;
}

bool AllPathsExecutor::hasSameVertices(NPath* path, const Edge& edge) {
if (edge.src == edge.dst) {
return true;
}
auto& vid = edge.dst;
auto iter = edgeList.begin() + 1;
for (; iter != edgeList.end(); iter++) {
if (iter->isEdge()) {
auto& edgeVal = iter->getEdge();
if (edgeVal.src == vid) {
return true;
}
NPath* head = path;
while (head != nullptr) {
auto& vertex = head->vertex;
if (vertex.getVertex().vid == vid) {
return true;
}
head = head->p;
}
return false;
}
Expand Down
29 changes: 22 additions & 7 deletions src/graph/executor/algo/AllPathsExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#ifndef GRAPH_EXECUTOR_ALGO_ALLPATHSEXECUTOR_H_
#define GRAPH_EXECUTOR_ALGO_ALLPATHSEXECUTOR_H_

#include "folly/ThreadLocal.h"
#include "graph/executor/StorageAccessExecutor.h"

// Using the two-way BFS algorithm, a heuristic algorithm is used in the expansion process
Expand All @@ -26,10 +27,10 @@
// when expanding, if the vid has already been visited, do not visit again
// leftAdjList_ save result of forward expansion
// rightAdjList_ save result of backward expansion

namespace nebula {
namespace graph {
class AllPaths;
struct NPath;
class AllPathsExecutor final : public StorageAccessExecutor {
public:
AllPathsExecutor(const PlanNode* node, QueryContext* qctx)
Expand All @@ -46,6 +47,17 @@ class AllPathsExecutor final : public StorageAccessExecutor {
template <typename T = Value>
using VertexMap = std::unordered_map<Value, std::vector<T>, VertexHash, VertexEqual>;

struct NPath {
NPath* p{nullptr};
const Value& vertex;
const Value& edge;
NPath(const Value& v, const Value& e) : vertex(v), edge(e) {}
NPath(NPath* path, const Value& v, const Value& e) : p(path), vertex(v), edge(e) {}
NPath(NPath&& v) noexcept : p(v.p), vertex(std::move(v.vertex)), edge(std::move(v.edge)) {}
NPath(const NPath& v) : p(v.p), vertex(v.vertex), edge(v.edge) {}
~NPath() {}
};

private:
void buildRequestVids(bool reverse);

Expand All @@ -59,19 +71,20 @@ class AllPathsExecutor final : public StorageAccessExecutor {

void expandFromRight(GetNeighborsIter* iter);

folly::Future<std::vector<Row>> doBuildPath(
size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<std::vector<Value>>> edgeLists);
Row convertNPath2Row(NPath* path, Value dst);

folly::Future<std::vector<std::pair<NPath*, Value>>> doBuildPath(
size_t step, size_t start, size_t end, std::shared_ptr<std::vector<NPath*>> paths);

folly::Future<Status> getPathProps();

folly::Future<Status> buildPathMultiJobs();

folly::Future<Status> buildResult();

bool hasSameVertices(const std::vector<Value>& edgeList, const Edge& edge);
bool hasSameEdge(NPath* path, const Edge& edge);

bool hasSameVertices(NPath* path, const Edge& edge);

private:
const AllPaths* pathNode_{nullptr};
Expand All @@ -94,6 +107,8 @@ class AllPathsExecutor final : public StorageAccessExecutor {

DataSet result_;
std::vector<Value> emptyPropVids_;
class NewTag {};
folly::ThreadLocalPtr<std::deque<NPath>, NewTag> threadLocalPtr_;
};
} // namespace graph
} // namespace nebula
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/ExpandExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ folly::Future<Status> ExpandExecutor::GetDstBySrc() {
size = (*result.dsts_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("resp[{}]", i), folly::toPrettyJson(info));
otherStats_.emplace(folly::sformat("step{} resp [{}]", currentStep_, i),
folly::toPrettyJson(info));
}
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
Expand Down
30 changes: 30 additions & 0 deletions src/graph/planner/ngql/GoPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,33 @@ PlanNode* GoPlanner::buildJoinDstPlan(PlanNode* dep) {
return join;
}

SubPlan GoPlanner::doSimplePlan() {
auto qctx = goCtx_->qctx;
size_t step = goCtx_->steps.mSteps();
auto* expand = Expand::make(qctx,
startNode_,
goCtx_->space.id,
false, // random
step,
buildEdgeProps(true));
expand->setEdgeTypes(buildEdgeTypes());
expand->setColNames({"_expand_vid"});
expand->setInputVar(goCtx_->vidsVar);

auto* dedup = Dedup::make(qctx, expand);

auto pool = qctx->objPool();
auto* newYieldExpr = pool->makeAndAdd<YieldColumns>();
newYieldExpr->addColumn(new YieldColumn(ColumnExpression::make(pool, 0)));
auto* project = Project::make(qctx, dedup, newYieldExpr);
project->setColNames(std::move(goCtx_->colNames));

SubPlan subPlan;
subPlan.root = project;
subPlan.tail = expand;
return subPlan;
}

SubPlan GoPlanner::doPlan() {
auto qctx = goCtx_->qctx;
auto& from = goCtx_->from;
Expand Down Expand Up @@ -257,6 +284,9 @@ StatusOr<SubPlan> GoPlanner::transform(AstContext* astCtx) {
subPlan.root = subPlan.tail = pt;
return subPlan;
}
if (goCtx_->isSimple) {
return doSimplePlan();
}
return doPlan();
}

Expand Down
Loading

0 comments on commit dc747b3

Please sign in to comment.