Skip to content

Commit

Permalink
Merge branch 'master' into max_validate_depth
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 authored Dec 26, 2023
2 parents 3b299bd + 5d308fc commit d217352
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 45 deletions.
52 changes: 36 additions & 16 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,29 @@ folly::Future<Status> AllPathsExecutor::doAllPaths() {
break;
}
}
return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
}
}
if (leftSteps_ + rightSteps_ >= maxStep_ || leftNextStepVids_.empty() ||
rightNextStepVids_.empty()) {
return buildResult();
}
return doAllPaths();
});
return folly::collectAll(futures).via(runner()).thenValue(
[this](std::vector<folly::Try<Status>>&& resps) {
memory::MemoryCheckGuard guard;
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto resp = std::move(respVal).value();
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
}
}
if (leftSteps_ + rightSteps_ >= maxStep_ || leftNextStepVids_.empty() ||
rightNextStepVids_.empty()) {
return buildResult();
}
return doAllPaths();
});
}

folly::Future<Status> AllPathsExecutor::getNeighbors(bool reverse) {
Expand Down Expand Up @@ -545,12 +555,22 @@ folly::Future<Status> AllPathsExecutor::conjunctPath(std::vector<NPath*>& leftPa
runner(), [this, start, end, reverse]() { return probe(start, end, reverse); }));
}
}
return folly::collect(futures)
return folly::collectAll(futures)
.via(runner())
.thenValue([this, path = std::move(oneWayPath)](std::vector<std::vector<Row>>&& resps) {
.thenValue([this,
path = std::move(oneWayPath)](std::vector<folly::Try<std::vector<Row>>>&& resps) {
memory::MemoryCheckGuard guard;
result_.rows = std::move(path);
for (auto& rows : resps) {
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto rows = std::move(respVal).value();
if (rows.empty()) {
continue;
}
Expand Down
49 changes: 34 additions & 15 deletions src/graph/executor/algo/BatchShortestPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,28 @@ folly::Future<Status> BatchShortestPath::execute(const HashSet& startVids,
resultDs_[rowNum].colNames = pathNode_->colNames();
futures.emplace_back(shortestPath(rowNum, 1));
}
return folly::collect(futures).via(runner()).thenValue([this, result](auto&& resps) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
NG_RETURN_IF_ERROR(resp);
}
result->colNames = pathNode_->colNames();
for (auto& ds : resultDs_) {
result->append(std::move(ds));
}
return Status::OK();
});
return folly::collectAll(futures).via(runner()).thenValue(
[this, result](std::vector<folly::Try<Status>>&& resps) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto resp = std::move(respVal).value();
NG_RETURN_IF_ERROR(resp);
}
result->colNames = pathNode_->colNames();
for (auto& ds : resultDs_) {
result->append(std::move(ds));
}
return Status::OK();
});
}

size_t BatchShortestPath::init(const HashSet& startVids, const HashSet& endVids) {
Expand Down Expand Up @@ -100,12 +110,21 @@ folly::Future<Status> BatchShortestPath::shortestPath(size_t rowNum, size_t step
std::vector<folly::Future<Status>> futures;
futures.emplace_back(getNeighbors(rowNum, stepNum, false));
futures.emplace_back(getNeighbors(rowNum, stepNum, true));
return folly::collect(futures)
return folly::collectAll(futures)
.via(runner())
.thenValue([this, rowNum, stepNum](auto&& resps) {
.thenValue([this, rowNum, stepNum](std::vector<folly::Try<Status>>&& resps) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto resp = std::move(respVal).value();
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
}
Expand Down
47 changes: 33 additions & 14 deletions src/graph/executor/algo/SingleShortestPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,27 @@ folly::Future<Status> SingleShortestPath::execute(const HashSet& startVids,
resultDs_[rowNum].colNames = pathNode_->colNames();
futures.emplace_back(shortestPath(rowNum, 1));
}
return folly::collect(futures).via(runner()).thenValue([this, result](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
NG_RETURN_IF_ERROR(resp);
}
result->colNames = pathNode_->colNames();
for (auto& ds : resultDs_) {
result->append(std::move(ds));
}
return Status::OK();
});
return folly::collectAll(futures).via(runner()).thenValue(
[this, result](std::vector<folly::Try<Status>>&& resps) {
memory::MemoryCheckGuard guard;
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto resp = std::move(respVal).value();
NG_RETURN_IF_ERROR(resp);
}
result->colNames = pathNode_->colNames();
for (auto& ds : resultDs_) {
result->append(std::move(ds));
}
return Status::OK();
});
}

void SingleShortestPath::init(const HashSet& startVids, const HashSet& endVids, size_t rowSize) {
Expand Down Expand Up @@ -69,11 +79,20 @@ folly::Future<Status> SingleShortestPath::shortestPath(size_t rowNum, size_t ste
futures.reserve(2);
futures.emplace_back(getNeighbors(rowNum, stepNum, false));
futures.emplace_back(getNeighbors(rowNum, stepNum, true));
return folly::collect(futures)
return folly::collectAll(futures)
.via(runner())
.thenValue([this, rowNum, stepNum](auto&& resps) {
.thenValue([this, rowNum, stepNum](std::vector<folly::Try<Status>>&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto resp = std::move(respVal).value();
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
}
Expand Down

0 comments on commit d217352

Please sign in to comment.