From 47925b5bef6ce15bf80fc1896d10e68809f36088 Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 22 Apr 2022 13:14:06 +0800 Subject: [PATCH] cherry pick #4570 to release-5.4 Signed-off-by: xufei --- dbms/src/Common/MemoryTracker.cpp | 5 +++++ dbms/src/Common/MemoryTracker.h | 1 + .../ParallelAggregatingBlockInputStream.cpp | 2 ++ .../ParallelAggregatingBlockInputStream.h | 1 + dbms/src/Interpreters/Aggregator.cpp | 19 ++++++++++++++++--- dbms/src/Interpreters/Aggregator.h | 13 +++++++++++-- 6 files changed, 36 insertions(+), 5 deletions(-) diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 9e556d4f716..705860e28ef 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -185,6 +185,11 @@ void disableThreshold() MEMORY_TRACER_SUBMIT_THRESHOLD = 0; } +Int64 getLocalDeltaMemory() +{ + return local_delta; +} + void alloc(Int64 size) { checkSubmitAndUpdateLocalDelta(local_delta + size); diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index cca7edea991..5c3938f0db9 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -96,6 +96,7 @@ extern thread_local MemoryTracker * current_memory_tracker; namespace CurrentMemoryTracker { void disableThreshold(); +Int64 getLocalDeltaMemory(); void alloc(Int64 size); void realloc(Int64 old_size, Int64 new_size); void free(Int64 size); diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index e0f66883851..3a7eed3c401 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -140,6 +140,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t parent.file_provider, parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, + parent.threads_data[thread_num].local_delta_memory, parent.no_more_keys); parent.threads_data[thread_num].src_rows += block.rows(); @@ -242,6 +243,7 @@ void ParallelAggregatingBlockInputStream::execute() file_provider, threads_data[0].key_columns, threads_data[0].aggregate_columns, + threads_data[0].local_delta_memory, no_more_keys); } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 5f539005a1f..b84901bf52f 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -86,6 +86,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream { size_t src_rows = 0; size_t src_bytes = 0; + Int64 local_delta_memory = 0; ColumnRawPtrs key_columns; Aggregator::AggregateColumns aggregate_columns; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 1a96a2bb50d..1aa034c42bb 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -511,7 +511,14 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns } } -bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) +bool Aggregator::executeOnBlock( + const Block & block, + AggregatedDataVariants & result, + const FileProviderPtr & file_provider, + ColumnRawPtrs & key_columns, + AggregateColumns & aggregate_columns, + Int64 & local_delta_memory, + bool & no_more_keys) { if (isCancelled()) return true; @@ -589,7 +596,13 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re size_t result_size = result.sizeWithoutOverflowRow(); Int64 current_memory_usage = 0; if (current_memory_tracker) + { current_memory_usage = current_memory_tracker->get(); + auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory(); + auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory; + current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff); + local_delta_memory = updated_local_delta_memory; + } auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads. @@ -804,14 +817,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria src_rows += block.rows(); src_bytes += block.bytes(); - if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, no_more_keys)) + if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys)) break; } /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) - executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, no_more_keys); + executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys); double elapsed_seconds = watch.elapsedSeconds(); size_t rows = result.sizeWithoutOverflowRow(); diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index a5c709dab8f..5eb8784535f 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -693,6 +693,7 @@ class Aggregator AggregateDescriptions aggregates; size_t keys_size; size_t aggregates_size; + Int64 local_delta_memory = 0; /// The settings of approximate calculation of GROUP BY. const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by. @@ -786,8 +787,14 @@ class Aggregator using AggregateFunctionsPlainPtrs = std::vector; /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break'). - bool executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block - bool & no_more_keys); + bool executeOnBlock( + const Block & block, + AggregatedDataVariants & result, + const FileProviderPtr & file_provider, + ColumnRawPtrs & key_columns, + AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block + Int64 & local_delta_memory, + bool & no_more_keys); /** Convert the aggregation data structure into a block. * If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block. @@ -893,6 +900,8 @@ class Aggregator /// How many RAM were used to process the query before processing the first block. Int64 memory_usage_before_aggregation = 0; + std::atomic local_memory_usage = 0; + std::mutex mutex; const LogWithPrefixPtr log;