Skip to content

Commit

Permalink
optimizer path (#4162)
Browse files Browse the repository at this point in the history
* optimizer multi-shortest path

* new algorithm

* fix error
  • Loading branch information
nevermore3 authored and Sophie-Xie committed Apr 20, 2022
1 parent c2e2dde commit d09f2e4
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 62 deletions.
3 changes: 1 addition & 2 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
std::vector<folly::Future<DataSet>> futures;
for (auto& vid : meetVids) {
batchVids.push_back(vid);
if (i == totalSize - 1 || batchVids.size() == batchSize) {
if (++i == totalSize || batchVids.size() == batchSize) {
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
return doConjunct(vids, oddStep);
});
futures.emplace_back(std::move(future));
}
i++;
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
Expand Down
208 changes: 156 additions & 52 deletions src/graph/executor/algo/MultiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,19 @@ folly::Future<Status> MultiShortestPathExecutor::execute() {
})
.thenValue([this](auto&& resp) {
UNUSED(resp);
preLeftPaths_.swap(leftPaths_);
preRightPaths_.swap(rightPaths_);
preRightPaths_ = rightPaths_;
// update history
for (auto& iter : leftPaths_) {
historyLeftPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()),
std::make_move_iterator(iter.second.end()));
}
for (auto& iter : rightPaths_) {
historyRightPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()),
std::make_move_iterator(iter.second.end()));
}
leftPaths_.clear();
rightPaths_.clear();

step_++;
DataSet ds;
ds.colNames = pathNode_->colNames();
Expand All @@ -58,24 +67,40 @@ void MultiShortestPathExecutor::init() {
for (; rIter->valid(); rIter->next()) {
auto& vid = rIter->getColumn(0);
if (rightVids.emplace(vid).second) {
preRightPaths_[vid].push_back({Path(Vertex(vid, {}), {})});
std::vector<Path> tmp({Path(Vertex(vid, {}), {})});
historyRightPaths_[vid].emplace(vid, tmp);
preRightPaths_[vid].emplace(vid, std::move(tmp));
}
}

std::set<Value> leftVids;
for (; lIter->valid(); lIter->next()) {
auto& vid = lIter->getColumn(0);
std::vector<Path> tmp({Path(Vertex(vid, {}), {})});
historyLeftPaths_[vid].emplace(vid, std::move(tmp));
leftVids.emplace(vid);
}
for (const auto& leftVid : leftVids) {
for (const auto& rightVid : rightVids) {
if (leftVid != rightVid) {
terminationMap_.insert({leftVid, {rightVid, true}});
terminationMap_.emplace(leftVid, std::make_pair(rightVid, true));
}
}
}
}

std::vector<Path> MultiShortestPathExecutor::createPaths(const std::vector<Path>& paths,
const Edge& edge) {
std::vector<Path> newPaths;
newPaths.reserve(paths.size());
for (const auto& p : paths) {
Path path = p;
path.steps.emplace_back(Step(Vertex(edge.dst, {}), edge.type, edge.name, edge.ranking, {}));
newPaths.emplace_back(std::move(path));
}
return newPaths;
}

Status MultiShortestPathExecutor::buildPath(bool reverse) {
auto iter = reverse ? ectx_->getResult(pathNode_->rightInputVar()).iter()
: ectx_->getResult(pathNode_->leftInputVar()).iter();
Expand All @@ -96,10 +121,23 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
Path path;
path.src = Vertex(src, {});
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
currentPaths[dst].emplace_back(std::move(path));
auto foundDst = currentPaths.find(dst);
if (foundDst != currentPaths.end()) {
auto foundSrc = foundDst->second.find(src);
if (foundSrc != foundDst->second.end()) {
// same <src, dst>, different edge type or rank
foundSrc->second.emplace_back(std::move(path));
} else {
std::vector<Path> tmp({std::move(path)});
foundDst->second.emplace(src, std::move(tmp));
}
} else {
std::vector<Path> tmp({std::move(path)});
currentPaths[dst].emplace(src, std::move(tmp));
}
}
} else {
auto& historyPaths = reverse ? preRightPaths_ : preLeftPaths_;
auto& historyPaths = reverse ? historyRightPaths_ : historyLeftPaths_;
for (; iter->valid(); iter->next()) {
auto edgeVal = iter->getEdge();
if (UNLIKELY(!edgeVal.isEdge())) {
Expand All @@ -108,50 +146,93 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) {
auto& edge = edgeVal.getEdge();
auto& src = edge.src;
auto& dst = edge.dst;
for (const auto& histPath : historyPaths[src]) {
Path path = histPath;
path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {}));
if (path.hasDuplicateVertices()) {
continue;
auto& prePaths = historyPaths[src];

auto foundHistDst = historyPaths.find(dst);
if (foundHistDst == historyPaths.end()) {
// dst not in history
auto foundDst = currentPaths.find(dst);
if (foundDst == currentPaths.end()) {
// dst not in current, new edge
for (const auto& prePath : prePaths) {
currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge));
}
} else {
// dst in current
for (const auto& prePath : prePaths) {
auto newPaths = createPaths(prePath.second, edge);
auto foundSrc = foundDst->second.find(prePath.first);
if (foundSrc == foundDst->second.end()) {
foundDst->second.emplace(prePath.first, std::move(newPaths));
} else {
foundSrc->second.insert(foundSrc->second.begin(),
std::make_move_iterator(newPaths.begin()),
std::make_move_iterator(newPaths.end()));
}
}
}
} else {
// dst in history
auto& historyDstPaths = foundHistDst->second;
for (const auto& prePath : prePaths) {
if (historyDstPaths.find(prePath.first) != historyDstPaths.end()) {
// loop: a->b->c->a or a->b->c->b,
// filter out path that with duplicate vertex or have already been found before
continue;
}
auto foundDst = currentPaths.find(dst);
if (foundDst == currentPaths.end()) {
currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge));
} else {
auto newPaths = createPaths(prePath.second, edge);
auto foundSrc = foundDst->second.find(prePath.first);
if (foundSrc == foundDst->second.end()) {
foundDst->second.emplace(prePath.first, std::move(newPaths));
} else {
foundSrc->second.insert(foundSrc->second.begin(),
std::make_move_iterator(newPaths.begin()),
std::make_move_iterator(newPaths.end()));
}
}
}
currentPaths[dst].emplace_back(std::move(path));
}
}
}

// set nextVid
const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar();
setNextStepVid(currentPaths, nextVidVar);
return Status::OK();
}

DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
Interims::iterator endIter,
bool oddStep) {
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
DataSet ds;
for (; startIter != endIter; ++startIter) {
auto found = rightPaths.find(startIter->first);
if (found == rightPaths.end()) {
continue;
}
for (const auto& lPath : startIter->second) {
const auto& srcVid = lPath.src.vid;
auto range = terminationMap_.equal_range(srcVid);
for (const auto& rPath : found->second) {
const auto& dstVid = rPath.src.vid;
for (auto iter = range.first; iter != range.second; ++iter) {
if (iter->second.first == dstVid) {
auto forwardPath = lPath;
auto backwardPath = rPath;
DataSet MultiShortestPathExecutor::doConjunct(
const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters) {
auto buildPaths =
[](const std::vector<Path>& leftPaths, const std::vector<Path>& rightPaths, DataSet& ds) {
for (const auto& leftPath : leftPaths) {
for (const auto& rightPath : rightPaths) {
auto forwardPath = leftPath;
auto backwardPath = rightPath;
backwardPath.reverse();
forwardPath.append(std::move(backwardPath));
if (forwardPath.hasDuplicateVertices()) {
continue;
}
Row row;
row.values.emplace_back(std::move(forwardPath));
ds.rows.emplace_back(std::move(row));
iter->second.second = false;
}
}
};

DataSet ds;
for (const auto& iter : iters) {
const auto& leftPaths = iter.first->second;
const auto& rightPaths = iter.second->second;
for (const auto& leftPath : leftPaths) {
auto range = terminationMap_.equal_range(leftPath.first);
for (const auto& rightPath : rightPaths) {
for (auto found = range.first; found != range.second; ++found) {
if (found->second.first == rightPath.first) {
buildPaths(leftPath.second, rightPath.second, ds);
found->second.second = false;
}
}
}
Expand All @@ -161,28 +242,51 @@ DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter,
}

folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
size_t batchSize = leftPaths_.size() / static_cast<size_t>(FLAGS_num_operator_threads);
auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_;
size_t leftPathSize = leftPaths_.size();
size_t rightPathSize = rightPaths.size();
std::vector<folly::Future<DataSet>> futures;
size_t i = 0;
std::vector<std::pair<Interims::iterator, Interims::iterator>> pathIters;

auto startIter = leftPaths_.begin();
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
if (i++ == batchSize) {
auto endIter = leftIter;
endIter++;
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
return doConjunct(startIter, endIter, oddStep);
});
futures.emplace_back(std::move(future));
i = 0;
startIter = endIter;
size_t i = 0;
if (leftPathSize > rightPathSize) {
size_t batchSize = leftPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
pathIters.reserve(batchSize);
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
auto rightIter = rightPaths.find(leftIter->first);
if (rightIter == rightPaths.end()) {
continue;
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
} else {
size_t batchSize = rightPathSize / static_cast<size_t>(FLAGS_num_operator_threads);
pathIters.reserve(batchSize);
for (auto rightIter = rightPaths.begin(); rightIter != rightPaths.end(); ++rightIter) {
auto leftIter = leftPaths_.find(rightIter->first);
if (leftIter == leftPaths_.end()) {
continue;
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
}
if (i != 0) {
auto endIter = leftPaths_.end();
auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() {
return doConjunct(startIter, endIter, oddStep);
});
auto future =
folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
futures.emplace_back(std::move(future));
}

Expand Down
13 changes: 8 additions & 5 deletions src/graph/executor/algo/MultiShortestPathExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,27 @@ class MultiShortestPathExecutor final : public Executor {
folly::Future<Status> execute() override;

private:
// k: dst, v: paths to dst
using Interims = std::unordered_map<Value, std::vector<Path>>;
// key: dst, value: {key : src, value: paths}
using Interims = std::unordered_map<Value, std::unordered_map<Value, std::vector<Path>>>;

void init();
std::vector<Path> createPaths(const std::vector<Path>& paths, const Edge& edge);
Status buildPath(bool reverse);
folly::Future<bool> conjunctPath(bool oddStep);
DataSet doConjunct(Interims::iterator startIter, Interims::iterator endIter, bool oddStep);
DataSet doConjunct(const std::vector<std::pair<Interims::iterator, Interims::iterator>>& iters);
void setNextStepVid(const Interims& paths, const string& var);

private:
const MultiShortestPath* pathNode_{nullptr};
size_t step_{1};
std::string terminationVar_;
// {src, <dst, true>}
std::unordered_multimap<Value, std::pair<Value, bool>> terminationMap_;
Interims leftPaths_;
Interims preLeftPaths_;
Interims preRightPaths_;
Interims rightPaths_;
Interims preRightPaths_;
Interims historyLeftPaths_;
Interims historyRightPaths_;
DataSet currentDs_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/algo/ProduceAllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ folly::Future<Status> ProduceAllPathsExecutor::conjunctPath() {

auto startIter = leftPaths_.begin();
for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) {
if (i++ == batchSize) {
if (++i == batchSize) {
auto endIter = leftIter;
endIter++;
auto oddStepFuture = folly::via(
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/test/FindPathTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ TEST_F(FindPathTest, multiSourceShortestPath) {
{
DataSet expectLeftVid;
expectLeftVid.colNames = {nebula::kVid};
for (const auto& vid : {"a", "b", "c", "f", "g"}) {
for (const auto& vid : {"b", "f", "g"}) {
Row row;
row.values.emplace_back(vid);
expectLeftVid.rows.emplace_back(std::move(row));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ DEFINE_int32(num_netio_threads,
"The number of networking threads, 0 for number of physical CPU cores");
DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections");
DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries");
DEFINE_int32(num_operator_threads, 5, "Number of threads to execute a single operator");
DEFINE_int32(num_operator_threads, 2, "Number of threads to execute a single operator");
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket");
DEFINE_string(listen_netdev, "any", "The network device to listen on");
Expand Down

0 comments on commit d09f2e4

Please sign in to comment.