Skip to content

Commit

Permalink
cherry pick #4570 to release-5.4
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <[email protected]>
  • Loading branch information
windtalker committed Apr 22, 2022
1 parent 32bd9b7 commit 47925b5
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 5 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ void disableThreshold()
MEMORY_TRACER_SUBMIT_THRESHOLD = 0;
}

Int64 getLocalDeltaMemory()
{
return local_delta;
}

void alloc(Int64 size)
{
checkSubmitAndUpdateLocalDelta(local_delta + size);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ extern thread_local MemoryTracker * current_memory_tracker;
namespace CurrentMemoryTracker
{
void disableThreshold();
Int64 getLocalDeltaMemory();
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
parent.file_provider,
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].local_delta_memory,
parent.no_more_keys);

parent.threads_data[thread_num].src_rows += block.rows();
Expand Down Expand Up @@ -242,6 +243,7 @@ void ParallelAggregatingBlockInputStream::execute()
file_provider,
threads_data[0].key_columns,
threads_data[0].aggregate_columns,
threads_data[0].local_delta_memory,
no_more_keys);
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
size_t src_rows = 0;
size_t src_bytes = 0;
Int64 local_delta_memory = 0;

ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Expand Down
19 changes: 16 additions & 3 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,14 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
}
}

bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
bool Aggregator::executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
Int64 & local_delta_memory,
bool & no_more_keys)
{
if (isCancelled())
return true;
Expand Down Expand Up @@ -589,7 +596,13 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (current_memory_tracker)
{
current_memory_usage = current_memory_tracker->get();
auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory();
auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory;
current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff);
local_delta_memory = updated_local_delta_memory;
}

auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads.

Expand Down Expand Up @@ -804,14 +817,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();

if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, no_more_keys))
if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys))
break;
}

/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, no_more_keys);
executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys);

double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ class Aggregator
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
Int64 local_delta_memory = 0;

/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
Expand Down Expand Up @@ -786,8 +787,14 @@ class Aggregator
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;

/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
Int64 & local_delta_memory,
bool & no_more_keys);

/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
Expand Down Expand Up @@ -893,6 +900,8 @@ class Aggregator
/// How many RAM were used to process the query before processing the first block.
Int64 memory_usage_before_aggregation = 0;

std::atomic<Int64> local_memory_usage = 0;

std::mutex mutex;

const LogWithPrefixPtr log;
Expand Down

0 comments on commit 47925b5

Please sign in to comment.