Skip to content

Commit

Permalink
fix concurrency bug (vesoft-inc#5702)
Browse files Browse the repository at this point in the history
fix concurrency bug
  • Loading branch information
nevermore3 authored Sep 1, 2023
1 parent f2c3d78 commit 8bc5cfc
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator
}

// Gather all results and do post works
return folly::collect(futures).via(runner()).thenValue(std::move(gather));
return folly::collectAll(futures).via(runner()).thenValue(std::move(gather));
}
} // namespace graph
} // namespace nebula
Expand Down
46 changes: 36 additions & 10 deletions src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,19 @@ folly::Future<Status> AppendVerticesExecutor::handleRespMultiJobs(
return buildVerticesResult(begin, end, tmpIter);
};

auto gather = [this](auto &&results) -> Status {
auto gather = [this](std::vector<folly::Try<StatusOr<DataSet>>> &&results) -> Status {
memory::MemoryCheckGuard guard;
for (auto &r : results) {
NG_RETURN_IF_ERROR(r);
auto &&rows = std::move(r).value();
for (auto &respVal : results) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto res = std::move(respVal).value();
auto &&rows = std::move(res).value();
std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows));
}
return finish(ResultBuilder().value(Value(std::move(result_))).build());
Expand All @@ -203,20 +211,38 @@ folly::Future<Status> AppendVerticesExecutor::handleRespMultiJobs(
};

auto gather =
[this, inputIterNew = std::move(inputIter)](auto &&prepareResult) -> folly::Future<Status> {
[this, inputIterNew = std::move(inputIter)](
std::vector<folly::Try<folly::Unit>> &&prepareResult) -> folly::Future<Status> {
memory::MemoryCheckGuard guard1;
UNUSED(prepareResult);
for (auto &respVal : prepareResult) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
}

auto scatterInput =
[this](size_t begin, size_t end, Iterator *tmpIter) mutable -> StatusOr<DataSet> {
return handleJob(begin, end, tmpIter);
};

auto gatherFinal = [this](auto &&results) -> Status {
auto gatherFinal = [this](std::vector<folly::Try<StatusOr<DataSet>>> &&results) -> Status {
memory::MemoryCheckGuard guard2;
for (auto &r : results) {
NG_RETURN_IF_ERROR(r);
auto &&rows = std::move(r).value();
for (auto &respVal : results) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto res = std::move(respVal).value();
auto &&rows = std::move(res).value();
std::move(rows.begin(), rows.end(), std::back_inserter(result_.rows));
}
return finish(ResultBuilder().value(Value(std::move(result_))).build());
Expand Down
18 changes: 12 additions & 6 deletions src/graph/executor/query/FilterExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,21 @@ folly::Future<Status> FilterExecutor::execute() {
return handleJob(begin, end, tmpIter);
};

auto gather =
[this, result = std::move(ds), kind = iter->kind()](auto &&results) mutable -> Status {
auto gather = [this, result = std::move(ds), kind = iter->kind()](
std::vector<folly::Try<StatusOr<DataSet>>> &&results) mutable -> Status {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
for (auto &r : results) {
if (!r.ok()) {
return r.status();
for (auto &respVal : results) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto &&rows = std::move(r).value();
auto res = std::move(respVal).value();
auto &&rows = std::move(res).value();
result.rows.insert(result.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
Expand Down
30 changes: 24 additions & 6 deletions src/graph/executor/query/InnerJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,22 @@ folly::Future<Status> InnerJoinExecutor::probe(const std::vector<Expression*>& p
return ds;
};

auto gather = [this](auto&& results) mutable -> Status {
auto gather = [this](std::vector<folly::Try<StatusOr<DataSet>>>&& results) mutable -> Status {
memory::MemoryCheckGuard guard;
DataSet result;
auto* joinNode = asNode<Join>(node());
result.colNames = joinNode->colNames();
for (auto& r : results) {
auto&& rows = std::move(r).value();
for (auto& respVal : results) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto res = std::move(respVal).value();
auto&& rows = std::move(res).value();
result.rows.insert(result.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
Expand All @@ -204,13 +213,22 @@ folly::Future<Status> InnerJoinExecutor::singleKeyProbe(Expression* probeKey, It
return ds;
};

auto gather = [this](auto&& results) mutable -> Status {
auto gather = [this](std::vector<folly::Try<StatusOr<DataSet>>>&& results) mutable -> Status {
memory::MemoryCheckGuard guard;
DataSet result;
auto* joinNode = asNode<Join>(node());
result.colNames = joinNode->colNames();
for (auto& r : results) {
auto&& rows = std::move(r).value();
for (auto& respVal : results) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto res = std::move(respVal).value();
auto&& rows = std::move(res).value();
result.rows.insert(result.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
Expand Down
30 changes: 24 additions & 6 deletions src/graph/executor/query/LeftJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,22 @@ folly::Future<Status> LeftJoinExecutor::probe(const std::vector<Expression*>& pr
return ds;
};

auto gather = [this](auto&& results) mutable -> Status {
auto gather = [this](std::vector<folly::Try<StatusOr<DataSet>>>&& results) mutable -> Status {
memory::MemoryCheckGuard guard;
DataSet result;
auto* joinNode = asNode<Join>(node());
result.colNames = joinNode->colNames();
for (auto& r : results) {
auto&& rows = std::move(r).value();
for (auto& respVal : results) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto res = std::move(respVal).value();
auto&& rows = std::move(res).value();
result.rows.insert(result.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
Expand All @@ -180,13 +189,22 @@ folly::Future<Status> LeftJoinExecutor::singleKeyProbe(Expression* probeKey, Ite
return ds;
};

auto gather = [this](auto&& results) mutable -> Status {
auto gather = [this](std::vector<folly::Try<StatusOr<DataSet>>>&& results) mutable -> Status {
memory::MemoryCheckGuard guard;
DataSet result;
auto* joinNode = asNode<Join>(node());
result.colNames = joinNode->colNames();
for (auto& r : results) {
auto&& rows = std::move(r).value();
for (auto& respVal : results) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto res = std::move(respVal).value();
auto&& rows = std::move(res).value();
result.rows.insert(result.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
Expand Down
16 changes: 13 additions & 3 deletions src/graph/executor/query/ProjectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,20 @@ folly::Future<Status> ProjectExecutor::execute() {
return handleJob(begin, end, tmpIter);
};

auto gather = [this, result = std::move(ds)](auto &&results) mutable {
auto gather = [this, result = std::move(ds)](
std::vector<folly::Try<StatusOr<DataSet>>> &&results) mutable {
memory::MemoryCheckGuard guard;
for (auto &r : results) {
auto &&rows = std::move(r).value();
for (auto &respVal : results) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto res = std::move(respVal).value();
auto &&rows = std::move(res).value();
result.rows.insert(result.rows.end(),
std::make_move_iterator(rows.begin()),
std::make_move_iterator(rows.end()));
Expand Down
13 changes: 11 additions & 2 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,19 @@ folly::Future<Status> TraverseExecutor::buildPathMultiJobs(size_t minStep, size_
return rows;
};

auto gather = [this](std::vector<std::vector<Row>> resp) mutable -> Status {
auto gather = [this](std::vector<folly::Try<std::vector<Row>>>&& resps) mutable -> Status {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
for (auto& rows : resp) {
for (auto& respVal : resps) {
if (respVal.hasException()) {
auto ex = respVal.exception().get_exception<std::bad_alloc>();
if (ex) {
throw std::bad_alloc();
} else {
throw std::runtime_error(respVal.exception().what().c_str());
}
}
auto rows = std::move(respVal).value();
if (rows.empty()) {
continue;
}
Expand Down

0 comments on commit 8bc5cfc

Please sign in to comment.