Skip to content

Commit

Permalink
make mpp_fail test stable (pingcap#4300)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker committed Jun 15, 2022
1 parent 6f17280 commit e471acf
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
6 changes: 5 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num)
{
parent.exceptions[thread_num] = exception;
Int32 old_value = -1;
parent.first_exception_index.compare_exchange_strong(old_value, static_cast<Int32>(thread_num), std::memory_order_seq_cst, std::memory_order_relaxed);

/// can not cancel parent inputStream or the exception might be lost
if (!parent.executed)
parent.processor.cancel(false);
Expand All @@ -205,7 +208,8 @@ void ParallelAggregatingBlockInputStream::execute()
processor.process();
processor.wait();

rethrowFirstException(exceptions);
if (first_exception_index != -1)
std::rethrow_exception(exceptions[first_exception_index]);

if (isCancelledOrThrowIfKilled())
return;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream

ManyAggregatedDataVariants many_data;
Exceptions exceptions;
std::atomic<Int32> first_exception_index{-1};

struct ThreadData
{
Expand Down

0 comments on commit e471acf

Please sign in to comment.