From a23adc27d19f944b9c2732528bc9f1f16ac2a534 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 12 Aug 2021 17:43:17 +0800 Subject: [PATCH] Fix race in function mergeToViaEmplace (#2617) (#2639) --- dbms/src/Interpreters/Aggregator.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 90b1388b52c..92bce9e6ce0 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1572,8 +1572,8 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream bool final; size_t threads; - Int32 current_bucket_num = -1; - Int32 max_scheduled_bucket_num = -1; + std::atomic current_bucket_num = -1; + std::atomic max_scheduled_bucket_num = -1; static constexpr Int32 NUM_BUCKETS = 256; struct ParallelMergeData @@ -1591,12 +1591,12 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream void scheduleThreadForNextBucket() { - ++max_scheduled_bucket_num; - if (max_scheduled_bucket_num >= NUM_BUCKETS) + int num = max_scheduled_bucket_num.fetch_add(1) + 1; + if (num >= NUM_BUCKETS) return; parallel_merge_data->pool.schedule( - ThreadFactory(true, "MergingAggregtd").newJob([this]{ thread(max_scheduled_bucket_num); })); + ThreadFactory(true, "MergingAggregtd").newJob([this, num]{ thread(num); })); } void thread(Int32 bucket_num)