diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 9e556d4f716..705860e28ef 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -185,6 +185,11 @@ void disableThreshold() MEMORY_TRACER_SUBMIT_THRESHOLD = 0; } +Int64 getLocalDeltaMemory() +{ + return local_delta; +} + void alloc(Int64 size) { checkSubmitAndUpdateLocalDelta(local_delta + size); diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index cca7edea991..5c3938f0db9 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -96,6 +96,7 @@ extern thread_local MemoryTracker * current_memory_tracker; namespace CurrentMemoryTracker { void disableThreshold(); +Int64 getLocalDeltaMemory(); void alloc(Int64 size); void realloc(Int64 old_size, Int64 new_size); void free(Int64 size); diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 79511dd30c8..bf41084e247 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -140,6 +140,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t parent.file_provider, parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, + parent.threads_data[thread_num].local_delta_memory, parent.no_more_keys); parent.threads_data[thread_num].src_rows += block.rows(); @@ -243,6 +244,7 @@ void ParallelAggregatingBlockInputStream::execute() file_provider, threads_data[0].key_columns, threads_data[0].aggregate_columns, + threads_data[0].local_delta_memory, no_more_keys); } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 02be1aeff5b..cc804152875 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -88,6 +88,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream { size_t src_rows = 0; size_t src_bytes = 0; + Int64 local_delta_memory = 0; ColumnRawPtrs key_columns; Aggregator::AggregateColumns aggregate_columns; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 1a96a2bb50d..38c0ec84f01 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -74,11 +74,11 @@ void AggregatedDataVariants::convertToTwoLevel() switch (type) { -#define M(NAME) \ - case Type::NAME: \ - NAME##_two_level = std::make_unique(*NAME); \ - NAME.reset(); \ - type = Type::NAME##_two_level; \ +#define M(NAME) \ + case Type::NAME: \ + NAME##_two_level = std::make_unique(*(NAME)); \ + (NAME).reset(); \ + type = Type::NAME##_two_level; \ break; APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) @@ -453,7 +453,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( AggregatedDataWithoutKey & res, size_t rows, AggregateFunctionInstruction * aggregate_instructions, - Arena * arena) const + Arena * arena) { /// Adding values for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) @@ -511,7 +511,14 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns } } -bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys) +bool Aggregator::executeOnBlock( + const Block & block, + AggregatedDataVariants & result, + const FileProviderPtr & file_provider, + ColumnRawPtrs & key_columns, + AggregateColumns & aggregate_columns, + Int64 & local_delta_memory, + bool & no_more_keys) { if (isCancelled()) return true; @@ -579,9 +586,9 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re else if (result.type == AggregatedDataVariants::Type::NAME) \ executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, result.collators, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr); - if (false) + if (false) // NOLINT { - } // NOLINT + } APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M } @@ -589,7 +596,13 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re size_t result_size = result.sizeWithoutOverflowRow(); Int64 current_memory_usage = 0; if (current_memory_tracker) + { current_memory_usage = current_memory_tracker->get(); + auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory(); + auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory; + current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff); + local_delta_memory = updated_local_delta_memory; + } auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads. @@ -642,7 +655,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out); - if (false) + if (false) // NOLINT { } APPLY_FOR_VARIANTS_TWO_LEVEL(M) @@ -804,14 +817,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria src_rows += block.rows(); src_bytes += block.bytes(); - if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, no_more_keys)) + if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys)) break; } /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) - executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, no_more_keys); + executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys); double elapsed_seconds = watch.elapsedSeconds(); size_t rows = result.sizeWithoutOverflowRow(); @@ -1099,9 +1112,9 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v #define M(NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_); - if (false) + if (false) // NOLINT { - } // NOLINT + } APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) #undef M else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -1116,9 +1129,9 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevel(AggregatedDataVariants & dat #define M(NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) return prepareBlocksAndFillTwoLevelImpl(data_variants, *data_variants.NAME, final, thread_pool); - if (false) + if (false) // NOLINT { - } // NOLINT + } APPLY_FOR_VARIANTS_TWO_LEVEL(M) #undef M else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -1522,7 +1535,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream #define M(NAME) \ else if (first->type == AggregatedDataVariants::Type::NAME) \ aggregator.mergeSingleLevelDataImplNAME)::element_type>(data); - if (false) + if (false) // NOLINT { } APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) @@ -1622,7 +1635,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream size_t thread_number = static_cast(bucket_num) % threads; Arena * arena = merged_data.aggregates_pools.at(thread_number).get(); - if (false) {} + if (false) {} // NOLINT #define M(NAME) \ else if (method == AggregatedDataVariants::Type::NAME) \ { \ @@ -1982,7 +1995,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV else if (result.type == AggregatedDataVariants::Type::NAME) \ mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false); - if (false) + if (false) // NOLINT { } APPLY_FOR_VARIANTS_TWO_LEVEL(M) @@ -2006,7 +2019,9 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV result.aggregates_pools.push_back(std::make_shared()); Arena * aggregates_pool = result.aggregates_pools.back().get(); - auto task = std::bind(merge_bucket, bucket, aggregates_pool); + auto task = [&merge_bucket, bucket, aggregates_pool] { + return merge_bucket(bucket, aggregates_pool); + }; if (thread_pool) thread_pool->schedule(wrapInvocable(true, task)); @@ -2227,9 +2242,9 @@ std::vector Aggregator::convertBlockToTwoLevel(const Block & block) type \ = AggregatedDataVariants::Type::NAME##_two_level; - if (false) + if (false) // NOLINT { - } // NOLINT + } APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) #undef M else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -2243,9 +2258,9 @@ std::vector Aggregator::convertBlockToTwoLevel(const Block & block) num_buckets \ = data.NAME->data.NUM_BUCKETS; - if (false) + if (false) // NOLINT { - } // NOLINT + } APPLY_FOR_VARIANTS_TWO_LEVEL(M) #undef M else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -2256,9 +2271,9 @@ std::vector Aggregator::convertBlockToTwoLevel(const Block & block) else if (data.type == AggregatedDataVariants::Type::NAME) \ convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, key_columns, block, splitted_blocks); - if (false) + if (false) // NOLINT { - } // NOLINT + } APPLY_FOR_VARIANTS_TWO_LEVEL(M) #undef M else throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); @@ -2315,9 +2330,9 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) else if (result.type == AggregatedDataVariants::Type::NAME) \ destroyImpl(result.NAME->data); - if (false) + if (false) // NOLINT { - } // NOLINT + } APPLY_FOR_AGGREGATED_VARIANTS(M) #undef M else if (result.type != AggregatedDataVariants::Type::without_key) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index a5c709dab8f..85e8008f149 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -693,6 +693,7 @@ class Aggregator AggregateDescriptions aggregates; size_t keys_size; size_t aggregates_size; + Int64 local_delta_memory = 0; /// The settings of approximate calculation of GROUP BY. const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by. @@ -786,8 +787,14 @@ class Aggregator using AggregateFunctionsPlainPtrs = std::vector; /// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break'). - bool executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block - bool & no_more_keys); + bool executeOnBlock( + const Block & block, + AggregatedDataVariants & result, + const FileProviderPtr & file_provider, + ColumnRawPtrs & key_columns, + AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block + Int64 & local_delta_memory, + bool & no_more_keys); /** Convert the aggregation data structure into a block. * If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block. @@ -893,6 +900,8 @@ class Aggregator /// How many RAM were used to process the query before processing the first block. Int64 memory_usage_before_aggregation = 0; + std::atomic local_memory_usage = 0; + std::mutex mutex; const LogWithPrefixPtr log; @@ -939,11 +948,11 @@ class Aggregator AggregateDataPtr overflow_row) const; /// For case when there are no keys (all aggregate into one row). - void executeWithoutKeyImpl( + static void executeWithoutKeyImpl( AggregatedDataWithoutKey & res, size_t rows, AggregateFunctionInstruction * aggregate_instructions, - Arena * arena) const; + Arena * arena); template void writeToTemporaryFileImpl(