diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 9d7a87eabf1..6358f517408 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -13,17 +13,21 @@ // limitations under the License. #include +#include #include #include #include #include #include +#include namespace DB { +namespace +{ /** Remove constant columns from block. */ -static void removeConstantsFromBlock(Block & block) +void removeConstantsFromBlock(Block & block) { size_t columns = block.columns(); size_t i = 0; @@ -39,7 +43,7 @@ static void removeConstantsFromBlock(Block & block) } } -static void removeConstantsFromSortDescription(const Block & header, SortDescription & description) +void removeConstantsFromSortDescription(const Block & header, SortDescription & description) { description.erase( std::remove_if(description.begin(), description.end(), [&](const SortColumnDescription & elem) { @@ -54,7 +58,7 @@ static void removeConstantsFromSortDescription(const Block & header, SortDescrip /** Add into block, whose constant columns was removed by previous function, * constant columns from header (which must have structure as before removal of constants from block). */ -static void enrichBlockWithConstants(Block & block, const Block & header) +void enrichBlockWithConstants(Block & block, const Block & header) { size_t rows = block.rows(); size_t columns = header.columns(); @@ -66,7 +70,7 @@ static void enrichBlockWithConstants(Block & block, const Block & header) block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name}); } } - +} // namespace MergeSortingBlockInputStream::MergeSortingBlockInputStream( const BlockInputStreamPtr & input, @@ -177,110 +181,8 @@ Block MergeSortingBlockInputStream::readImpl() return res; } - -MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( - Blocks & blocks_, - SortDescription & description_, - const String & req_id, - size_t max_merged_block_size_, - size_t limit_) - : blocks(blocks_) - , header(blocks.at(0).cloneEmpty()) - , description(description_) - , max_merged_block_size(max_merged_block_size_) - , limit(limit_) - , log(Logger::get(req_id)) -{ - Blocks nonempty_blocks; - for (const auto & block : blocks) - { - if (block.rows() == 0) - continue; - - nonempty_blocks.push_back(block); - cursors.emplace_back(block, description); - has_collation |= cursors.back().has_collation; - } - - blocks.swap(nonempty_blocks); - - if (!has_collation) - { - for (auto & cursor : cursors) - queue.push(SortCursor(&cursor)); - } - else - { - for (auto & cursor : cursors) - queue_with_collation.push(SortCursorWithCollation(&cursor)); - } -} - - -Block MergeSortingBlocksBlockInputStream::readImpl() -{ - if (blocks.empty()) - return Block(); - - if (blocks.size() == 1) - { - Block res = blocks[0]; - blocks.clear(); - return res; - } - - return !has_collation - ? mergeImpl(queue) - : mergeImpl(queue_with_collation); -} - - -template -Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue & queue) -{ - size_t num_columns = blocks[0].columns(); - - MutableColumns merged_columns = blocks[0].cloneEmptyColumns(); - /// TODO: reserve (in each column) - - /// Take rows from queue in right order and push to 'merged'. - size_t merged_rows = 0; - while (!queue.empty()) - { - TSortCursor current = queue.top(); - queue.pop(); - - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - - if (!current->isLast()) - { - current->next(); - queue.push(current); - } - - ++total_merged_rows; - if (limit && total_merged_rows == limit) - { - auto res = blocks[0].cloneWithColumns(std::move(merged_columns)); - blocks.clear(); - return res; - } - - ++merged_rows; - if (merged_rows == max_merged_block_size) - return blocks[0].cloneWithColumns(std::move(merged_columns)); - } - - if (merged_rows == 0) - return {}; - - return blocks[0].cloneWithColumns(std::move(merged_columns)); -} - void MergeSortingBlockInputStream::appendInfo(FmtBuffer & buffer) const { buffer.fmtAppend(", limit = {}", limit); } - } // namespace DB diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index dd91576f8e1..3b75c80d472 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -21,69 +22,10 @@ #include #include #include -#include - -#include namespace DB { -/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks. - * If data to sort is too much, could use external sorting, with temporary files. - */ - -/** Part of implementation. Merging array of ready (already read from somewhere) blocks. - * Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each. - */ -class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream -{ - static constexpr auto NAME = "MergeSortingBlocks"; - -public: - /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. - MergeSortingBlocksBlockInputStream( - Blocks & blocks_, - SortDescription & description_, - const String & req_id, - size_t max_merged_block_size_, - size_t limit_ = 0); - - String getName() const override { return NAME; } - - bool isGroupedOutput() const override { return true; } - bool isSortedOutput() const override { return true; } - const SortDescription & getSortDescription() const override { return description; } - - Block getHeader() const override { return header; } - -protected: - Block readImpl() override; - -private: - Blocks & blocks; - Block header; - SortDescription description; - size_t max_merged_block_size; - size_t limit; - size_t total_merged_rows = 0; - - using CursorImpls = std::vector; - CursorImpls cursors; - - bool has_collation = false; - - std::priority_queue queue; - std::priority_queue queue_with_collation; - - /** Two different cursors are supported - with and without Collation. - * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. - */ - template - Block mergeImpl(std::priority_queue & queue); - - LoggerPtr log; -}; - class MergeSortingBlockInputStream : public IProfilingBlockInputStream { static constexpr auto NAME = "MergeSorting"; diff --git a/dbms/src/DataStreams/MergeSortingBlocksBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlocksBlockInputStream.cpp new file mode 100644 index 00000000000..521b5e42eaa --- /dev/null +++ b/dbms/src/DataStreams/MergeSortingBlocksBlockInputStream.cpp @@ -0,0 +1,118 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ +MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( + Blocks & blocks_, + SortDescription & description_, + const String & req_id, + size_t max_merged_block_size_, + size_t limit_) + : blocks(blocks_) + , header(blocks.at(0).cloneEmpty()) + , description(description_) + , max_merged_block_size(max_merged_block_size_) + , limit(limit_) + , log(Logger::get(req_id)) +{ + Blocks nonempty_blocks; + for (const auto & block : blocks) + { + if (block.rows() == 0) + continue; + + nonempty_blocks.push_back(block); + cursors.emplace_back(block, description); + has_collation |= cursors.back().has_collation; + } + + blocks.swap(nonempty_blocks); + + if (!has_collation) + { + for (auto & cursor : cursors) + queue.push(SortCursor(&cursor)); + } + else + { + for (auto & cursor : cursors) + queue_with_collation.push(SortCursorWithCollation(&cursor)); + } +} + + +Block MergeSortingBlocksBlockInputStream::readImpl() +{ + if (blocks.empty()) + return Block(); + + if (blocks.size() == 1) + { + Block res = blocks[0]; + blocks.clear(); + return res; + } + + return !has_collation + ? mergeImpl(queue) + : mergeImpl(queue_with_collation); +} + + +template +Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue & queue) +{ + size_t num_columns = blocks[0].columns(); + + MutableColumns merged_columns = blocks[0].cloneEmptyColumns(); + /// TODO: reserve (in each column) + + /// Take rows from queue in right order and push to 'merged'. + size_t merged_rows = 0; + while (!queue.empty()) + { + TSortCursor current = queue.top(); + queue.pop(); + + for (size_t i = 0; i < num_columns; ++i) + merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + + if (!current->isLast()) + { + current->next(); + queue.push(current); + } + + ++total_merged_rows; + if (limit && total_merged_rows == limit) + { + auto res = blocks[0].cloneWithColumns(std::move(merged_columns)); + blocks.clear(); + return res; + } + + ++merged_rows; + if (merged_rows == max_merged_block_size) + return blocks[0].cloneWithColumns(std::move(merged_columns)); + } + + if (merged_rows == 0) + return {}; + + return blocks[0].cloneWithColumns(std::move(merged_columns)); +} +} // namespace DB diff --git a/dbms/src/DataStreams/MergeSortingBlocksBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlocksBlockInputStream.h new file mode 100644 index 00000000000..7656241436c --- /dev/null +++ b/dbms/src/DataStreams/MergeSortingBlocksBlockInputStream.h @@ -0,0 +1,81 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include + +namespace DB +{ +/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks. + * If data to sort is too much, could use external sorting, with temporary files. + */ + +/** Part of implementation. Merging array of ready (already read from somewhere) blocks. + * Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each. + */ +class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream +{ + static constexpr auto NAME = "MergeSortingBlocks"; + +public: + /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. + MergeSortingBlocksBlockInputStream( + Blocks & blocks_, + SortDescription & description_, + const String & req_id, + size_t max_merged_block_size_, + size_t limit_ = 0); + + String getName() const override { return NAME; } + + bool isGroupedOutput() const override { return true; } + bool isSortedOutput() const override { return true; } + const SortDescription & getSortDescription() const override { return description; } + + Block getHeader() const override { return header; } + +protected: + Block readImpl() override; + +private: + Blocks & blocks; + Block header; + SortDescription description; + size_t max_merged_block_size; + size_t limit; + size_t total_merged_rows = 0; + + using CursorImpls = std::vector; + CursorImpls cursors; + + bool has_collation = false; + + std::priority_queue queue; + std::priority_queue queue_with_collation; + + /** Two different cursors are supported - with and without Collation. + * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. + */ + template + Block mergeImpl(std::priority_queue & queue); + + LoggerPtr log; +}; +} // namespace DB diff --git a/dbms/src/Interpreters/MergingAndConvertingBlockInputStream.h b/dbms/src/DataStreams/MergingAndConvertingBlockInputStream.h similarity index 100% rename from dbms/src/Interpreters/MergingAndConvertingBlockInputStream.h rename to dbms/src/DataStreams/MergingAndConvertingBlockInputStream.h diff --git a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp index 4f033e6afe5..b95f847c42a 100644 --- a/dbms/src/Flash/tests/gtest_aggregation_executor.cpp +++ b/dbms/src/Flash/tests/gtest_aggregation_executor.cpp @@ -134,6 +134,50 @@ class ExecutorAggTestRunner : public ExecutorTest toVec("c4_str", {"1", "2 ", "2 "}), toVec("c5_date_time", {2000000, 12000000, 12000000}), }); + + /// agg table with 200 rows + { + // with 15 types of key. + std::vector::FieldType>> key(200); + std::vector> value(200); + for (size_t i = 0; i < 200; ++i) + { + key[i] = i % 15; + value[i] = {fmt::format("val_{}", i)}; + } + context.addMockTable( + {"test_db", "big_table_1"}, + {{"key", TiDB::TP::TypeLong}, {"value", TiDB::TP::TypeString}}, + {toNullableVec("key", key), toNullableVec("value", value)}); + } + { + // with 200 types of key. + std::vector::FieldType>> key(200); + std::vector> value(200); + for (size_t i = 0; i < 200; ++i) + { + key[i] = i; + value[i] = {fmt::format("val_{}", i)}; + } + context.addMockTable( + {"test_db", "big_table_2"}, + {{"key", TiDB::TP::TypeLong}, {"value", TiDB::TP::TypeString}}, + {toNullableVec("key", key), toNullableVec("value", value)}); + } + { + // with 1 types of key. + std::vector::FieldType>> key(200); + std::vector> value(200); + for (size_t i = 0; i < 200; ++i) + { + key[i] = 0; + value[i] = {fmt::format("val_{}", i)}; + } + context.addMockTable( + {"test_db", "big_table_3"}, + {{"key", TiDB::TP::TypeLong}, {"value", TiDB::TP::TypeString}}, + {toNullableVec("key", key), toNullableVec("value", value)}); + } } std::shared_ptr buildDAGRequest(std::pair src, MockAstVec agg_funcs, MockAstVec group_by_exprs, MockColumnNameVec proj) @@ -342,6 +386,9 @@ try } CATCH +// TODO support more type of min, max, count. +// support more aggregation functions: sum, forst_row, group_concat + TEST_F(ExecutorAggTestRunner, AggregationCountGroupByFastPathMultiKeys) try { @@ -510,8 +557,29 @@ try } CATCH -// TODO support more type of min, max, count. -// support more aggregation functions: sum, forst_row, group_concat +TEST_F(ExecutorAggTestRunner, AggMerge) +try +{ + std::vector tables{"big_table_1", "big_table_2", "big_table_3"}; + for (const auto & table : tables) + { + auto request = context + .scan("test_db", table) + .aggregation({Max(col("value"))}, {col("key")}) + .build(context); + auto expect = executeStreams(request, 1); + context.context.setSetting("group_by_two_level_threshold_bytes", Field(static_cast(0))); + // 0: use one level merge + // 1: use two level merge + std::vector two_level_thresholds{0, 1}; + for (auto two_level_threshold : two_level_thresholds) + { + context.context.setSetting("group_by_two_level_threshold", Field(static_cast(two_level_threshold))); + executeAndAssertColumnsEqual(request, expect); + } + } +} +CATCH } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_projection_executor.cpp b/dbms/src/Flash/tests/gtest_projection_executor.cpp index d7e90b6cb51..599044f5503 100644 --- a/dbms/src/Flash/tests/gtest_projection_executor.cpp +++ b/dbms/src/Flash/tests/gtest_projection_executor.cpp @@ -343,7 +343,7 @@ try .scan("test_db", "test_table3") .project({lit(Field(String("a")))}) .build(context); - executeAndAssertColumnsEqual(req, {createColumns({toVec({"a", "a", "a", "a", "a"})})}); + executeAndAssertColumnsEqual(req, {createColumns({createConstColumn(5, "a")})}); req = context .scan("test_db", "test_table3") @@ -351,7 +351,7 @@ try .project(MockAstVec{}) .project({lit(Field(String("a")))}) .build(context); - executeAndAssertColumnsEqual(req, {createColumns({toVec({"a", "a", "a", "a", "a"})})}); + executeAndAssertColumnsEqual(req, {createColumns({createConstColumn(5, "a")})}); req = context .scan("test_db", "test_table3") @@ -359,7 +359,7 @@ try .project(MockAstVec{}) .project({lit(Field(String("a")))}) .build(context); - executeAndAssertColumnsEqual(req, {createColumns({toVec({"a", "a", "a", "a", "a"})})}); + executeAndAssertColumnsEqual(req, {createColumns({createConstColumn(5, "a")})}); req = context .scan("test_db", "test_table3") diff --git a/dbms/src/Flash/tests/gtest_topn_executor.cpp b/dbms/src/Flash/tests/gtest_topn_executor.cpp index 461f0e8b4c9..407676c8a3e 100644 --- a/dbms/src/Flash/tests/gtest_topn_executor.cpp +++ b/dbms/src/Flash/tests/gtest_topn_executor.cpp @@ -19,7 +19,6 @@ namespace DB { namespace tests { - class ExecutorTopNTestRunner : public DB::tests::ExecutorTest { public: @@ -45,6 +44,38 @@ class ExecutorTopNTestRunner : public DB::tests::ExecutorTest toNullableVec(col_name[1], col_gender), toNullableVec(col_name[2], col_country), toNullableVec(col_name[3], col_salary)}); + + /// table with 200 rows + { + // with 15 types of key. + std::vector::FieldType>> key(200); + for (size_t i = 0; i < 200; ++i) + key[i] = i % 15; + context.addMockTable( + {"test_db", "big_table_1"}, + {{"key", TiDB::TP::TypeLong}}, + {toNullableVec("key", key)}); + } + { + // with 200 types of key. + std::vector::FieldType>> key(200); + for (size_t i = 0; i < 200; ++i) + key[i] = i; + context.addMockTable( + {"test_db", "big_table_2"}, + {{"key", TiDB::TP::TypeLong}}, + {toNullableVec("key", key)}); + } + { + // with 1 types of key. + std::vector::FieldType>> key(200); + for (size_t i = 0; i < 200; ++i) + key[i] = 0; + context.addMockTable( + {"test_db", "big_table_3"}, + {{"key", TiDB::TP::TypeLong}}, + {toNullableVec("key", key)}); + } } std::shared_ptr buildDAGRequest(const String & table_name, const String & col_name, bool is_desc, int limit_num) @@ -214,5 +245,25 @@ try } CATCH +TEST_F(ExecutorTopNTestRunner, BigTable) +try +{ + std::vector tables{"big_table_1", "big_table_2", "big_table_3"}; + for (const auto & table : tables) + { + std::vector limits{0, 1, 10, 20, 199, 200, 300}; + for (auto limit_num : limits) + { + auto request = context + .scan("test_db", table) + .topN("key", false, limit_num) + .build(context); + auto expect = executeStreams(request, 1); + executeAndAssertColumnsEqual(request, expect); + } + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 4a9ffc1c993..ba7d1b5e32a 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -32,7 +33,6 @@ #include #include #include -#include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index a890c868d0b..85c54c5e7c0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -303,22 +303,7 @@ Block SegmentTestBasic::prepareWriteBlock(Int64 start_key, Int64 end_key, bool i Block sortMergeBlocks(std::vector && blocks) { - auto accumulated_block = std::move(blocks[0]); - - for (size_t block_idx = 1; block_idx < blocks.size(); ++block_idx) - { - auto block = std::move(blocks[block_idx]); - - size_t columns = block.columns(); - size_t rows = block.rows(); - - for (size_t i = 0; i < columns; ++i) - { - MutableColumnPtr mutable_column = (*std::move(accumulated_block.getByPosition(i).column)).mutate(); - mutable_column->insertRangeFrom(*block.getByPosition(i).column, 0, rows); - accumulated_block.getByPosition(i).column = std::move(mutable_column); - } - } + auto accumulated_block = mergeBlocks(std::move(blocks)); SortDescription sort; sort.emplace_back(EXTRA_HANDLE_COLUMN_NAME, 1, 0); diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 7719d021b37..7ca140ee650 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -166,34 +166,6 @@ void ExecutorTest::executeAndAssertRowsEqual(const std::shared_ptr actual_cols; - for (const auto & column : sample_block.getColumnsWithTypeAndName()) - { - actual_cols.push_back(column.type->createColumn()); - } - for (const auto & block : blocks) - { - for (size_t i = 0; i < block.columns(); ++i) - { - for (size_t j = 0; j < block.rows(); ++j) - { - actual_cols[i]->insert((*(block.getColumnsWithTypeAndName())[i].column)[j]); - } - } - } - - ColumnsWithTypeAndName actual_columns; - for (size_t i = 0; i < actual_cols.size(); ++i) - actual_columns.push_back({std::move(actual_cols[i]), sample_block.getColumnsWithTypeAndName()[i].type, sample_block.getColumnsWithTypeAndName()[i].name, sample_block.getColumnsWithTypeAndName()[i].column_id}); - return Block(actual_columns); -} - void readStream(Blocks & blocks, BlockInputStreamPtr stream) { stream->readPrefix(); @@ -214,7 +186,7 @@ DB::ColumnsWithTypeAndName readBlocks(std::vector streams) Blocks actual_blocks; for (const auto & stream : streams) readStream(actual_blocks, stream); - return mergeBlocksForTest(std::move(actual_blocks)).getColumnsWithTypeAndName(); + return mergeBlocks(std::move(actual_blocks)).getColumnsWithTypeAndName(); } void ExecutorTest::enablePlanner(bool is_enable) @@ -238,7 +210,7 @@ ColumnsWithTypeAndName ExecutorTest::executeStreams(DAGContext * dag_context) // Currently, don't care about regions information in tests. Blocks blocks; queryExecute(context.context, /*internal=*/true)->execute([&blocks](const Block & block) { blocks.push_back(block); }).verify(); - return mergeBlocksForTest(std::move(blocks)).getColumnsWithTypeAndName(); + return mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName(); } Blocks ExecutorTest::getExecuteStreamsReturnBlocks(const std::shared_ptr & request, size_t concurrency)