Skip to content

Commit

Permalink
make mpp_fail test stable (#4300)
Browse files Browse the repository at this point in the history
close #4299
  • Loading branch information
windtalker authored Mar 16, 2022
1 parent 8fa861a commit 10536de
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
6 changes: 5 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num)
{
parent.exceptions[thread_num] = exception;
Int32 old_value = -1;
parent.first_exception_index.compare_exchange_strong(old_value, static_cast<Int32>(thread_num), std::memory_order_seq_cst, std::memory_order_relaxed);

/// can not cancel parent inputStream or the exception might be lost
if (!parent.executed)
/// kill the processor so ExchangeReceiver will be closed
Expand All @@ -220,7 +223,8 @@ void ParallelAggregatingBlockInputStream::execute()
processor.process();
processor.wait();

rethrowFirstException(exceptions);
if (first_exception_index != -1)
std::rethrow_exception(exceptions[first_exception_index]);

if (isCancelledOrThrowIfKilled())
return;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream

ManyAggregatedDataVariants many_data;
Exceptions exceptions;
std::atomic<Int32> first_exception_index{-1};

struct ThreadData
{
Expand Down

0 comments on commit 10536de

Please sign in to comment.