Skip to content

Commit

Permalink
Fix race in function mergeToViaEmplace (#2617)
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall authored Aug 9, 2021
1 parent 1376786 commit 9776ea6
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1572,8 +1572,8 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream
bool final;
size_t threads;

Int32 current_bucket_num = -1;
Int32 max_scheduled_bucket_num = -1;
std::atomic<Int32> current_bucket_num = -1;
std::atomic<Int32> max_scheduled_bucket_num = -1;
static constexpr Int32 NUM_BUCKETS = 256;

struct ParallelMergeData
Expand All @@ -1591,12 +1591,12 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream

void scheduleThreadForNextBucket()
{
++max_scheduled_bucket_num;
if (max_scheduled_bucket_num >= NUM_BUCKETS)
int num = max_scheduled_bucket_num.fetch_add(1) + 1;
if (num >= NUM_BUCKETS)
return;

parallel_merge_data->pool.schedule(
ThreadFactory(true, "MergingAggregtd").newJob([this]{ thread(max_scheduled_bucket_num); }));
ThreadFactory(true, "MergingAggregtd").newJob([this, num]{ thread(num); }));
}

void thread(Int32 bucket_num)
Expand Down

0 comments on commit 9776ea6

Please sign in to comment.