Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make performance of TPCH q15 stable (#4570) #4709

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,37 @@ void disableThreshold()
MEMORY_TRACER_SUBMIT_THRESHOLD = 0;
}

<<<<<<< HEAD
=======
void submitLocalDeltaMemory()
{
if (current_memory_tracker)
{
try
{
if (local_delta > 0)
{
current_memory_tracker->alloc(local_delta, false);
}
else if (local_delta < 0)
{
current_memory_tracker->free(-local_delta);
}
}
catch (...)
{
DB::tryLogCurrentException("MemoryTracker", "Failed when try to submit local delta memory");
}
}
local_delta = 0;
}

Int64 getLocalDeltaMemory()
{
return local_delta;
}

>>>>>>> feee96afe1 (Make performance of TPCH q15 stable (#4570))
void alloc(Int64 size)
{
checkSubmitAndUpdateLocalDelta(local_delta + size);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ extern thread_local MemoryTracker * current_memory_tracker;
namespace CurrentMemoryTracker
{
void disableThreshold();
<<<<<<< HEAD
=======
void submitLocalDeltaMemory();
Int64 getLocalDeltaMemory();
>>>>>>> feee96afe1 (Make performance of TPCH q15 stable (#4570))
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
parent.file_provider,
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].local_delta_memory,
parent.no_more_keys);

parent.threads_data[thread_num].src_rows += block.rows();
Expand Down Expand Up @@ -261,6 +262,7 @@ void ParallelAggregatingBlockInputStream::execute()
file_provider,
threads_data[0].key_columns,
threads_data[0].aggregate_columns,
threads_data[0].local_delta_memory,
no_more_keys);
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
size_t src_rows = 0;
size_t src_bytes = 0;
Int64 local_delta_memory = 0;

ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Expand Down
19 changes: 16 additions & 3 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,14 @@ void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns
}
}

bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
bool Aggregator::executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
Int64 & local_delta_memory,
bool & no_more_keys)
{
if (isCancelled())
return true;
Expand Down Expand Up @@ -603,7 +610,13 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (current_memory_tracker)
{
current_memory_usage = current_memory_tracker->get();
auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory();
auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory;
current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff);
local_delta_memory = updated_local_delta_memory;
}

auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads.

Expand Down Expand Up @@ -818,14 +831,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();

if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, no_more_keys))
if (!executeOnBlock(block, result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys))
break;
}

/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, no_more_keys);
executeOnBlock(stream->getHeader(), result, file_provider, key_columns, aggregate_columns, params.local_delta_memory, no_more_keys);

double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ class Aggregator
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
Int64 local_delta_memory = 0;

/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
Expand Down Expand Up @@ -800,8 +801,14 @@ class Aggregator
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;

/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result, const FileProviderPtr & file_provider, ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
const FileProviderPtr & file_provider,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
Int64 & local_delta_memory,
bool & no_more_keys);

/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
Expand Down Expand Up @@ -907,6 +914,8 @@ class Aggregator
/// How many RAM were used to process the query before processing the first block.
Int64 memory_usage_before_aggregation = 0;

std::atomic<Int64> local_memory_usage = 0;

std::mutex mutex;

const LogWithPrefixPtr log;
Expand Down