diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index e79e6077366..f64881ae35a 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -226,6 +226,11 @@ void submitLocalDeltaMemory() local_delta = 0; } +Int64 getLocalDeltaMemory() +{ + return local_delta; +} + void alloc(Int64 size) { checkSubmitAndUpdateLocalDelta(local_delta + size); diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 457377a7ce0..c87ec713dda 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -111,6 +111,7 @@ namespace CurrentMemoryTracker { void disableThreshold(); void submitLocalDeltaMemory(); +Int64 getLocalDeltaMemory(); void alloc(Int64 size); void realloc(Int64 old_size, Int64 new_size); void free(Int64 size); diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 62a7e7c4c46..3163975108f 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -157,6 +157,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t parent.file_provider, parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, + parent.threads_data[thread_num].local_delta_memory, parent.no_more_keys); parent.threads_data[thread_num].src_rows += block.rows(); @@ -270,6 +271,7 @@ void ParallelAggregatingBlockInputStream::execute() file_provider, threads_data[0].key_columns, threads_data[0].aggregate_columns, + threads_data[0].local_delta_memory, no_more_keys); } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 3f486d2e35f..398c3d35bbc 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -106,6 +106,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream { size_t src_rows = 0; size_t src_bytes = 0; + Int64 local_delta_memory = 0; ColumnRawPtrs key_columns; Aggregator::AggregateColumns aggregate_columns; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index ed640ce5d08..6e067b88d81 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -522,7 +522,14 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns } } -bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) +bool Aggregator::executeOnBlock( + const Block & block, + AggregatedDataVariants & result, + const FileProviderPtr & file_provider, + ColumnRawPtrs & key_columns, + AggregateColumns & aggregate_columns, + Int64 & local_delta_memory, + bool & no_more_keys) { if (isCancelled()) return true; @@ -600,7 +607,13 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re size_t result_size = result.sizeWithoutOverflowRow(); Int64 current_memory_usage = 0; if (current_memory_tracker) + { current_memory_usage = current_memory_tracker->get(); + auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory(); + auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory; + current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff); + local_delta_memory = updated_local_delta_memory; + } auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads. @@ -815,14 +828,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria src_rows += block.rows(); src_bytes += block.bytes(); - if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, no_more_keys)) + if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys)) break; } /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) - executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, no_more_keys); + executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys); double elapsed_seconds = watch.elapsedSeconds(); size_t rows = result.sizeWithoutOverflowRow(); diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 672a0951465..b3bb537dc2e 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -706,6 +706,7 @@ class Aggregator AggregateDescriptions aggregates; size_t keys_size; size_t aggregates_size; + Int64 local_delta_memory = 0; /// The settings of approximate calculation of GROUP BY. const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by. @@ -799,8 +800,14 @@ class Aggregator using AggregateFunctionsPlainPtrs = std::vector; /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break'). - bool executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block - bool & no_more_keys); + bool executeOnBlock( + const Block & block, + AggregatedDataVariants & result, + const FileProviderPtr & file_provider, + ColumnRawPtrs & key_columns, + AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block + Int64 & local_delta_memory, + bool & no_more_keys); /** Convert the aggregation data structure into a block. * If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block. @@ -906,6 +913,8 @@ class Aggregator /// How many RAM were used to process the query before processing the first block. Int64 memory_usage_before_aggregation = 0; + std::atomic local_memory_usage = 0; + std::mutex mutex; const LoggerPtr log;