From e471acfff0303a683e04d340a2e1ae37fd7fb890 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 16 Mar 2022 19:53:52 +0800 Subject: [PATCH] make mpp_fail test stable (#4300) close pingcap/tiflash#4299 --- .../src/DataStreams/ParallelAggregatingBlockInputStream.cpp | 6 +++++- dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 7705ef8847a..c2c0004c04c 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -181,6 +181,9 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish() void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num) { parent.exceptions[thread_num] = exception; + Int32 old_value = -1; + parent.first_exception_index.compare_exchange_strong(old_value, static_cast(thread_num), std::memory_order_seq_cst, std::memory_order_relaxed); + /// can not cancel parent inputStream or the exception might be lost if (!parent.executed) parent.processor.cancel(false); @@ -205,7 +208,8 @@ void ParallelAggregatingBlockInputStream::execute() processor.process(); processor.wait(); - rethrowFirstException(exceptions); + if (first_exception_index != -1) + std::rethrow_exception(exceptions[first_exception_index]); if (isCancelledOrThrowIfKilled()) return; diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 02be1aeff5b..c91369dbf98 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -83,6 +83,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream ManyAggregatedDataVariants many_data; Exceptions exceptions; + std::atomic first_exception_index{-1}; struct ThreadData {