From c59074c7ac499001422d7932edaf9afdc5d2ace3 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 16 Mar 2022 19:53:52 +0800 Subject: [PATCH] make mpp_fail test stable (#4300) close pingcap/tiflash#4299 --- .../src/DataStreams/ParallelAggregatingBlockInputStream.cpp | 6 +++++- dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 447dfdfbed5..9d9d73b3631 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -195,6 +195,9 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish() void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num) { parent.exceptions[thread_num] = exception; + Int32 old_value = -1; + parent.first_exception_index.compare_exchange_strong(old_value, static_cast(thread_num), std::memory_order_seq_cst, std::memory_order_relaxed); + /// can not cancel parent inputStream or the exception might be lost if (!parent.executed) /// kill the processor so ExchangeReceiver will be closed @@ -220,7 +223,8 @@ void ParallelAggregatingBlockInputStream::execute() processor.process(); processor.wait(); - rethrowFirstException(exceptions); + if (first_exception_index != -1) + std::rethrow_exception(exceptions[first_exception_index]); if (isCancelledOrThrowIfKilled()) return; diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index b96fe3ad0e5..b7e9c3fe9a1 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -102,6 +102,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream ManyAggregatedDataVariants many_data; Exceptions exceptions; + std::atomic first_exception_index{-1}; struct ThreadData {