From 53862f2b06d3952d74966f90dde79bf7d5189eb5 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Fri, 13 Jan 2023 21:39:46 +0800 Subject: [PATCH] split aggregation out block smaller than max block size (#6590) close pingcap/tiflash#6579 --- dbms/src/Core/Block.h | 1 + .../MergingAndConvertingBlockInputStream.h | 68 ++-- .../ParallelAggregatingBlockInputStream.cpp | 3 +- .../AggregationInterpreterHelper.cpp | 1 + .../tests/gtest_aggregation_executor.cpp | 56 +++- dbms/src/Flash/tests/gtest_join_executor.cpp | 2 +- dbms/src/Flash/tests/gtest_spill_sort.cpp | 2 +- dbms/src/Interpreters/Aggregator.cpp | 294 +++++++++++++++++- dbms/src/Interpreters/Aggregator.h | 48 ++- .../Interpreters/InterpreterSelectQuery.cpp | 2 +- 10 files changed, 445 insertions(+), 32 deletions(-) diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index 5f2cabe7859..b63415d511c 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -157,6 +157,7 @@ class Block using Blocks = std::vector; using BlocksList = std::list; +using BucketBlocksListMap = std::map; Block mergeBlocks(Blocks && blocks); diff --git a/dbms/src/DataStreams/MergingAndConvertingBlockInputStream.h b/dbms/src/DataStreams/MergingAndConvertingBlockInputStream.h index e9ec1f1fdeb..c772c4ffd46 100644 --- a/dbms/src/DataStreams/MergingAndConvertingBlockInputStream.h +++ b/dbms/src/DataStreams/MergingAndConvertingBlockInputStream.h @@ -77,9 +77,6 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream if (data.empty()) return {}; - if (current_bucket_num >= NUM_BUCKETS) - return {}; - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_merge_failpoint); AggregatedDataVariantsPtr & first = data[0]; @@ -91,14 +88,21 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream if (first->type == AggregatedDataVariants::Type::without_key) { aggregator.mergeWithoutKeyDataImpl(data); - return aggregator.prepareBlockAndFillWithoutKey( + single_level_blocks = aggregator.prepareBlocksAndFillWithoutKey( *first, final); + return popBlocksListFront(single_level_blocks); } } if (!first->isTwoLevel()) { + Block out_block = popBlocksListFront(single_level_blocks); + if (likely(out_block)) + { + return out_block; + } + if (current_bucket_num > 0) return {}; @@ -120,10 +124,16 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); } #undef M - return aggregator.prepareBlockAndFillSingleLevel(*first, final); + single_level_blocks = aggregator.prepareBlocksAndFillSingleLevel(*first, final); + return popBlocksListFront(single_level_blocks); } else { + Block out_block = popBlocksListFront(two_level_blocks); + if (likely(out_block)) + { + return out_block; + } if (!parallel_merge_data) { parallel_merge_data = std::make_unique(threads); @@ -131,40 +141,47 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream scheduleThreadForNextBucket(); } - Block res; - while (true) { std::unique_lock lock(parallel_merge_data->mutex); + if (current_bucket_num >= NUM_BUCKETS) + { + return {}; + } + if (parallel_merge_data->exception) std::rethrow_exception(parallel_merge_data->exception); auto it = parallel_merge_data->ready_blocks.find(current_bucket_num); if (it != parallel_merge_data->ready_blocks.end()) { - ++current_bucket_num; scheduleThreadForNextBucket(); + current_bucket_num++; - if (it->second) + if (!it->second.empty()) { - res.swap(it->second); - break; + two_level_blocks.splice(two_level_blocks.end(), std::move(it->second), it->second.begin(), it->second.end()); + + Block out = popBlocksListFront(two_level_blocks); + if (likely(out)) + { + return out; + } } - else if (current_bucket_num >= NUM_BUCKETS) - break; - } + continue; + } parallel_merge_data->condvar.wait(lock); } - - return res; } } private: const LoggerPtr log; const Aggregator & aggregator; + BlocksList single_level_blocks; + BlocksList two_level_blocks; ManyAggregatedDataVariants data; bool final; size_t threads; @@ -175,7 +192,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream struct ParallelMergeData { - std::map ready_blocks; + BucketBlocksListMap ready_blocks; std::exception_ptr exception; std::mutex mutex; std::condition_variable condvar; @@ -203,7 +220,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream { auto & merged_data = *data[0]; auto method = merged_data.type; - Block block; + BlocksList blocks; /// Select Arena to avoid race conditions size_t thread_number = static_cast(bucket_num) % threads; @@ -213,7 +230,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream case AggregationMethodType(NAME): \ { \ aggregator.mergeBucketImpl(data, bucket_num, arena); \ - block = aggregator.convertOneBucketToBlock( \ + blocks = aggregator.convertOneBucketToBlocks( \ merged_data, \ *ToAggregationMethodPtr(NAME, merged_data.aggregation_method_impl), \ arena, \ @@ -230,7 +247,7 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream #undef M std::lock_guard lock(parallel_merge_data->mutex); - parallel_merge_data->ready_blocks[bucket_num] = std::move(block); + parallel_merge_data->ready_blocks[bucket_num] = std::move(blocks); } catch (...) { @@ -241,6 +258,17 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream parallel_merge_data->condvar.notify_all(); } + + static Block popBlocksListFront(BlocksList & blocks) + { + if (!blocks.empty()) + { + Block out_block = blocks.front(); + blocks.pop_front(); + return out_block; + } + return {}; + } }; #undef AggregationMethodName diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 0f18406f00e..ae5119d6978 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -118,9 +118,8 @@ Block ParallelAggregatingBlockInputStream::readImpl() executed = true; } - Block res; if (isCancelledOrThrowIfKilled() || !impl) - return res; + return {}; return impl->read(); } diff --git a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp index 09efb06b913..1130f8ec325 100644 --- a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp +++ b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp @@ -104,6 +104,7 @@ Aggregator::Params buildParams( settings.max_bytes_before_external_group_by, !is_final_agg, context.getTemporaryPath(), + context.getSettingsRef().max_block_size, has_collator ? collators : TiDB::dummy_collators); } diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index f89990ad381..9ccf3142707 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -150,6 +150,20 @@ class AggExecutorTestRunner : public ExecutorTest {{"key", TiDB::TP::TypeLong}, {"value", TiDB::TP::TypeString}}, {toNullableVec("key", key), toNullableVec("value", value)}); } + { + // with 1024 types of key. + std::vector::FieldType>> key(1024); + std::vector> value(1024); + for (size_t i = 0; i < 1024; ++i) + { + key[i] = i; + value[i] = {fmt::format("val_{}", i)}; + } + context.addMockTable( + {"test_db", "big_table_4"}, + {{"key", TiDB::TP::TypeLong}, {"value", TiDB::TP::TypeString}}, + {toNullableVec("key", key), toNullableVec("value", value)}); + } } std::shared_ptr buildDAGRequest(std::pair src, MockAstVec agg_funcs, MockAstVec group_by_exprs, MockColumnNameVec proj) @@ -531,7 +545,7 @@ CATCH TEST_F(AggExecutorTestRunner, AggMerge) try { - std::vector tables{"big_table_1", "big_table_2", "big_table_3"}; + std::vector tables{"big_table_1", "big_table_2", "big_table_3", "big_table_4"}; for (const auto & table : tables) { auto request = context @@ -552,5 +566,45 @@ try } CATCH +TEST_F(AggExecutorTestRunner, SplitAggOutput) +try +{ + std::vector tables{"big_table_1", "big_table_2", "big_table_3", "big_table_4"}; + std::vector max_block_sizes{1, 2, 9, 19, 40, DEFAULT_BLOCK_SIZE}; + std::vector concurrences{1, 2, 10}; + std::vector expect_rows{15, 200, 1, 1024}; + for (size_t i = 0; i < tables.size(); ++i) + { + auto request = context + .scan("test_db", tables[i]) + .aggregation({Max(col("value"))}, {col("key")}) + .build(context); + context.context.setSetting("group_by_two_level_threshold_bytes", Field(static_cast(0))); + // 0: use one level + // 1: use two level + std::vector two_level_thresholds{0, 1}; + for (auto two_level_threshold : two_level_thresholds) + { + for (auto block_size : max_block_sizes) + { + for (auto concurrency : concurrences) + { + context.context.setSetting("group_by_two_level_threshold", Field(static_cast(two_level_threshold))); + context.context.setSetting("max_block_size", Field(static_cast(block_size))); + auto blocks = getExecuteStreamsReturnBlocks(request, concurrency); + size_t actual_row = 0; + for (auto & block : blocks) + { + ASSERT(block.rows() <= block_size); + actual_row += block.rows(); + } + ASSERT_EQ(actual_row, expect_rows[i]); + } + } + } + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_join_executor.cpp b/dbms/src/Flash/tests/gtest_join_executor.cpp index 954ebf610c9..977e9356f95 100644 --- a/dbms/src/Flash/tests/gtest_join_executor.cpp +++ b/dbms/src/Flash/tests/gtest_join_executor.cpp @@ -759,7 +759,7 @@ try ColumnsWithTypeAndName common_column_data; size_t table_rows = 102400; size_t common_rows = 20480; - size_t max_block_size = 800; + UInt64 max_block_size = 800; size_t original_max_streams = 20; for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(left_column_infos)) { diff --git a/dbms/src/Flash/tests/gtest_spill_sort.cpp b/dbms/src/Flash/tests/gtest_spill_sort.cpp index 34421fad380..78b7e75de6b 100644 --- a/dbms/src/Flash/tests/gtest_spill_sort.cpp +++ b/dbms/src/Flash/tests/gtest_spill_sort.cpp @@ -37,7 +37,7 @@ try DB::MockColumnInfoVec column_infos{{"a", TiDB::TP::TypeLongLong}, {"b", TiDB::TP::TypeLongLong}, {"c", TiDB::TP::TypeLongLong}, {"d", TiDB::TP::TypeLongLong}, {"e", TiDB::TP::TypeLongLong}}; ColumnsWithTypeAndName column_data; size_t table_rows = 102400; - size_t max_block_size = 500; + UInt64 max_block_size = 500; size_t original_max_streams = 20; size_t total_data_size = 0; size_t limit_size = table_rows / 10 * 9; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 57dc3e259ab..63cec7dbf64 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -920,6 +920,26 @@ Block Aggregator::convertOneBucketToBlock( return block; } +template +BlocksList Aggregator::convertOneBucketToBlocks( + AggregatedDataVariants & data_variants, + Method & method, + Arena * arena, + bool final, + size_t bucket) const +{ + BlocksList blocks = prepareBlocksAndFill(data_variants, final, method.data.impls[bucket].size(), [bucket, &method, arena, this](std::vector & key_columns_vec, std::vector & aggregate_columns_vec, std::vector & final_aggregate_columns_vec, bool final_) { + convertToBlocksImpl(method, method.data.impls[bucket], key_columns_vec, aggregate_columns_vec, final_aggregate_columns_vec, arena, final_); + }); + + for (auto & block : blocks) + { + block.info.bucket_num = bucket; + } + + return blocks; +} + template void Aggregator::writeToTemporaryFileImpl( @@ -951,7 +971,7 @@ void Aggregator::writeToTemporaryFileImpl( /// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects. data_variants.aggregator = nullptr; - LOG_TRACE(log, "Max size of temporary block: {} rows, {:.3f} MiB.", max_temporary_block_size_rows, (max_temporary_block_size_bytes / 1048576.0)); + LOG_TRACE(log, "Max size of temporary bucket blocks: {} rows, {:.3f} MiB.", max_temporary_block_size_rows, (max_temporary_block_size_bytes / 1048576.0)); } @@ -1050,6 +1070,44 @@ void Aggregator::convertToBlockImpl( data.clearAndShrink(); } +template +void Aggregator::convertToBlocksImpl( + Method & method, + Table & data, + std::vector & key_columns_vec, + std::vector & aggregate_columns_vec, + std::vector & final_aggregate_columns_vec, + Arena * arena, + bool final) const +{ + if (data.empty()) + return; + + std::vector> raw_key_columns_vec; + raw_key_columns_vec.reserve(key_columns_vec.size()); + for (auto & key_columns : key_columns_vec) + { + RUNTIME_CHECK_MSG(key_columns.size() == params.keys_size, "Aggregate. Unexpected key columns size."); + + std::vector raw_key_columns; + raw_key_columns.reserve(key_columns.size()); + for (auto & column : key_columns) + { + raw_key_columns.push_back(column.get()); + } + + raw_key_columns_vec.push_back(raw_key_columns); + } + + if (final) + convertToBlocksImplFinal(method, data, std::move(raw_key_columns_vec), final_aggregate_columns_vec, arena); + else + convertToBlocksImplNotFinal(method, data, std::move(raw_key_columns_vec), aggregate_columns_vec); + + /// In order to release memory early. + data.clearAndShrink(); +} + template inline void Aggregator::insertAggregatesIntoColumns( @@ -1200,6 +1258,30 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( }); } +template +void NO_INLINE Aggregator::convertToBlocksImplFinal( + Method & method, + Table & data, + std::vector> key_columns_vec, + std::vector & final_aggregate_columns_vec, + Arena * arena) const +{ + assert(!key_columns_vec.empty()); + auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns_vec[0], key_sizes); + const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes; + + AggregatorMethodInitKeyColumnHelper agg_keys_helper{method}; + agg_keys_helper.initAggKeys(data.size(), key_columns_vec[0]); + + size_t data_index = 0; + data.forEachValue([&](const auto & key, auto & mapped) { + size_t key_columns_vec_index = data_index / params.max_block_size; + agg_keys_helper.insertKeyIntoColumns(key, key_columns_vec[key_columns_vec_index], key_sizes_ref, params.collators); + insertAggregatesIntoColumns(mapped, final_aggregate_columns_vec[key_columns_vec_index], arena); + data_index++; + }); +} + template void NO_INLINE Aggregator::convertToBlockImplNotFinal( Method & method, @@ -1224,6 +1306,32 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal( }); } +template +void NO_INLINE Aggregator::convertToBlocksImplNotFinal( + Method & method, + Table & data, + std::vector> key_columns_vec, + std::vector & aggregate_columns_vec) const +{ + auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns_vec[0], key_sizes); + const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes; + + AggregatorMethodInitKeyColumnHelper agg_keys_helper{method}; + agg_keys_helper.initAggKeys(data.size(), key_columns_vec[0]); + + size_t data_index = 0; + data.forEachValue([&](const auto & key, auto & mapped) { + size_t key_columns_vec_index = data_index / params.max_block_size; + agg_keys_helper.insertKeyIntoColumns(key, key_columns_vec[key_columns_vec_index], key_sizes_ref, params.collators); + + /// reserved, so push_back does not throw exceptions + for (size_t i = 0; i < params.aggregates_size; ++i) + aggregate_columns_vec[key_columns_vec_index][i]->push_back(mapped + offsets_of_aggregate_states[i]); + + data_index++; + mapped = nullptr; + }); +} template Block Aggregator::prepareBlockAndFill( @@ -1301,6 +1409,114 @@ Block Aggregator::prepareBlockAndFill( return res; } +template +BlocksList Aggregator::prepareBlocksAndFill( + AggregatedDataVariants & data_variants, + bool final, + size_t rows, + Filler && filler) const +{ + Block header = getHeader(final); + + size_t block_count = (rows + params.max_block_size - 1) / params.max_block_size; + std::vector key_columns_vec; + std::vector aggregate_columns_data_vec; + std::vector aggregate_columns_vec; + std::vector final_aggregate_columns_vec; + + size_t block_rows = params.max_block_size; + + for (size_t j = 0; j < block_count; ++j) + { + if (j == (block_count - 1) && rows % block_rows != 0) + { + block_rows = rows % block_rows; + } + + key_columns_vec.push_back(MutableColumns(params.keys_size)); + aggregate_columns_data_vec.push_back(AggregateColumnsData(params.aggregates_size)); + aggregate_columns_vec.push_back(MutableColumns(params.aggregates_size)); + final_aggregate_columns_vec.push_back(MutableColumns(params.aggregates_size)); + + auto & key_columns = key_columns_vec.back(); + auto & aggregate_columns_data = aggregate_columns_data_vec.back(); + auto & aggregate_columns = aggregate_columns_vec.back(); + auto & final_aggregate_columns = final_aggregate_columns_vec.back(); + + for (size_t i = 0; i < params.keys_size; ++i) + { + key_columns[i] = header.safeGetByPosition(i).type->createColumn(); + key_columns[i]->reserve(block_rows); + } + + for (size_t i = 0; i < params.aggregates_size; ++i) + { + if (!final) + { + const auto & aggregate_column_name = params.aggregates[i].column_name; + aggregate_columns[i] = header.getByName(aggregate_column_name).type->createColumn(); + + /// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states. + auto & column_aggregate_func = assert_cast(*aggregate_columns[i]); + + for (auto & pool : data_variants.aggregates_pools) + column_aggregate_func.addArena(pool); + + aggregate_columns_data[i] = &column_aggregate_func.getData(); + aggregate_columns_data[i]->reserve(block_rows); + } + else + { + final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn(); + final_aggregate_columns[i]->reserve(block_rows); + + if (aggregate_functions[i]->isState()) + { + /// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states. + if (auto * column_aggregate_func = typeid_cast(final_aggregate_columns[i].get())) + for (auto & pool : data_variants.aggregates_pools) + column_aggregate_func->addArena(pool); + } + } + } + } + + filler(key_columns_vec, aggregate_columns_data_vec, final_aggregate_columns_vec, final); + + BlocksList res_list; + block_rows = params.max_block_size; + for (size_t j = 0; j < block_count; ++j) + { + Block res = header.cloneEmpty(); + + for (size_t i = 0; i < params.keys_size; ++i) + res.getByPosition(i).column = std::move(key_columns_vec[j][i]); + + for (size_t i = 0; i < params.aggregates_size; ++i) + { + const auto & aggregate_column_name = params.aggregates[i].column_name; + if (final) + res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns_vec[j][i]); + else + res.getByName(aggregate_column_name).column = std::move(aggregate_columns_vec[j][i]); + } + + if (j == (block_count - 1) && rows % block_rows != 0) + { + block_rows = rows % block_rows; + } + + /// Change the size of the columns-constants in the block. + size_t columns = header.columns(); + for (size_t i = 0; i < columns; ++i) + if (res.getByPosition(i).column->isColumnConst()) + res.getByPosition(i).column = res.getByPosition(i).column->cut(0, block_rows); + + res_list.push_back(res); + } + + return res_list; +} Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final) const { @@ -1314,10 +1530,7 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va if (data_variants.type == AggregatedDataVariants::Type::without_key) { AggregatedDataWithoutKey & data = data_variants.without_key; - - if (!data) - throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR); - + RUNTIME_CHECK_MSG(data, "Wrong data variant passed."); if (!final_) { for (size_t i = 0; i < params.aggregates_size; ++i) @@ -1341,6 +1554,43 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va } +BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final) const +{ + size_t rows = 1; + + auto filler = [&data_variants, this]( + std::vector &, + std::vector & aggregate_columns_vec, + std::vector & final_aggregate_columns_vec, + bool final_) { + if (data_variants.type == AggregatedDataVariants::Type::without_key) + { + AggregatedDataWithoutKey & data = data_variants.without_key; + + RUNTIME_CHECK_MSG(data, "Wrong data variant passed."); + + if (!final_) + { + for (size_t i = 0; i < params.aggregates_size; ++i) + aggregate_columns_vec[0][i]->push_back(data + offsets_of_aggregate_states[i]); + data = nullptr; + } + else + { + /// Always single-thread. It's safe to pass current arena from 'aggregates_pool'. + insertAggregatesIntoColumns(data, final_aggregate_columns_vec[0], data_variants.aggregates_pool); + } + } + }; + + BlocksList blocks = prepareBlocksAndFill(data_variants, final, rows, filler); + + if (final) + destroyWithoutKey(data_variants); + + return blocks; +} + Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const { size_t rows = data_variants.size(); @@ -1375,6 +1625,40 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v return prepareBlockAndFill(data_variants, final, rows, filler); } +BlocksList Aggregator::prepareBlocksAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const +{ + size_t rows = data_variants.size(); + + auto filler = [&data_variants, this]( + std::vector & key_columns_vec, + std::vector & aggregate_columns_vec, + std::vector & final_aggregate_columns_vec, + bool final_) { +#define M(NAME) \ + case AggregationMethodType(NAME): \ + { \ + convertToBlocksImpl( \ + *ToAggregationMethodPtr(NAME, data_variants.aggregation_method_impl), \ + ToAggregationMethodPtr(NAME, data_variants.aggregation_method_impl)->data, \ + key_columns_vec, \ + aggregate_columns_vec, \ + final_aggregate_columns_vec, \ + data_variants.aggregates_pool, \ + final_); \ + break; \ + } + switch (data_variants.type) + { + APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) + default: + throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); + } +#undef M + }; + + return prepareBlocksAndFill(data_variants, final, rows, filler); +} + BlocksList Aggregator::prepareBlocksAndFillTwoLevel( AggregatedDataVariants & data_variants, diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index f79673e01c7..abae85001d6 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -902,6 +902,7 @@ class Aggregator const std::string tmp_path; + UInt64 max_block_size; TiDB::TiDBCollators collators; Params( @@ -915,6 +916,7 @@ class Aggregator size_t max_bytes_before_external_group_by_, bool empty_result_for_aggregation_by_empty_set_, const std::string & tmp_path_, + UInt64 max_block_size_, const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators) : src_header(src_header_) , keys(keys_) @@ -928,6 +930,7 @@ class Aggregator , max_bytes_before_external_group_by(max_bytes_before_external_group_by_) , empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_) , tmp_path(tmp_path_) + , max_block_size(max_block_size_) , collators(collators_) { } @@ -936,8 +939,9 @@ class Aggregator Params(const Block & intermediate_header_, const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, + UInt64 max_block_size_ = DEFAULT_BLOCK_SIZE, const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators) - : Params(Block(), keys_, aggregates_, 0, OverflowMode::THROW, 0, 0, 0, false, "", collators_) + : Params(Block(), keys_, aggregates_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_block_size_, collators_) { intermediate_header = intermediate_header_; } @@ -1158,6 +1162,16 @@ class Aggregator Arena * arena, bool final) const; + template + void convertToBlocksImpl( + Method & method, + Table & data, + std::vector & key_columns_vec, + std::vector & aggregate_columns_vec, + std::vector & final_aggregate_columns_vec, + Arena * arena, + bool final) const; + template void convertToBlockImplFinal( Method & method, @@ -1166,6 +1180,14 @@ class Aggregator MutableColumns & final_aggregate_columns, Arena * arena) const; + template + void convertToBlocksImplFinal( + Method & method, + Table & data, + std::vector> key_columns_vec, + std::vector & final_aggregate_columns_vec, + Arena * arena) const; + template void convertToBlockImplNotFinal( Method & method, @@ -1173,6 +1195,13 @@ class Aggregator std::vector key_columns, AggregateColumnsData & aggregate_columns) const; + template + void convertToBlocksImplNotFinal( + Method & method, + Table & data, + std::vector> key_columns_vec, + std::vector & aggregate_columns_vec) const; + template Block prepareBlockAndFill( AggregatedDataVariants & data_variants, @@ -1180,6 +1209,13 @@ class Aggregator size_t rows, Filler && filler) const; + template + BlocksList prepareBlocksAndFill( + AggregatedDataVariants & data_variants, + bool final, + size_t rows, + Filler && filler) const; + template Block convertOneBucketToBlock( AggregatedDataVariants & data_variants, @@ -1188,6 +1224,14 @@ class Aggregator bool final, size_t bucket) const; + template + BlocksList convertOneBucketToBlocks( + AggregatedDataVariants & data_variants, + Method & method, + Arena * arena, + bool final, + size_t bucket) const; + template void insertAggregatesIntoColumns( Mapped & mapped, @@ -1201,7 +1245,9 @@ class Aggregator AggregateFunctionInstructions & instructions); Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final) const; + BlocksList prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final) const; Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const; + BlocksList prepareBlocksAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const; BlocksList prepareBlocksAndFillTwoLevel( AggregatedDataVariants & data_variants, bool final, diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 6a72ab247e6..5efbb119a00 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -974,7 +974,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre */ bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0; - Aggregator::Params params(header, keys, aggregates, settings.max_rows_to_group_by, settings.group_by_overflow_mode, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, context.getTemporaryPath()); + Aggregator::Params params(header, keys, aggregates, settings.max_rows_to_group_by, settings.group_by_overflow_mode, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, context.getTemporaryPath(), settings.max_block_size); /// If there are several sources, then we perform parallel aggregation if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1)