From 3c7e1219961960b22ebfed39d8b069713e5ba543 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Thu, 22 Dec 2022 15:35:33 +0800 Subject: [PATCH 01/15] merge output block if it too small in hash join --- dbms/src/Core/Block.cpp | 20 ++++++ dbms/src/Core/Block.h | 2 + .../HashJoinProbeBlockInputStream.cpp | 71 ++++++++++++++++++- .../HashJoinProbeBlockInputStream.h | 4 ++ dbms/src/Flash/tests/gtest_join_executor.cpp | 21 +++++- dbms/src/Interpreters/Join.h | 2 +- 6 files changed, 115 insertions(+), 5 deletions(-) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 69fc45ec3c1..b5ec40a1272 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -514,6 +514,26 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons return ReturnType(true); } +Block blocksMerge(Blocks && result_blocks) +{ + auto & sample_block = result_blocks[0]; + MutableColumns dst_columns(sample_block.columns()); + for (size_t i = 0; i < sample_block.columns(); i++) + { + dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty(); + } + for (auto & current_block : result_blocks) + { + if (current_block.rows() > 0) + { + for (size_t column = 0; column < current_block.columns(); column++) + { + dst_columns[column]->insertRangeFrom(*current_block.getByPosition(column).column, 0, current_block.rows()); + } + } + } + return sample_block.cloneWithColumns(std::move(dst_columns)); +} bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs) { diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index 206d6d959cc..c8c9c8aa865 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -149,6 +149,7 @@ class Block */ void updateHash(SipHash & hash) const; + private: void eraseImpl(size_t position); void initializeIndexByName(); @@ -157,6 +158,7 @@ class Block using Blocks = std::vector; using BlocksList = std::list; +Block blocksMerge(Blocks && result_blocks); /// Compare number of columns, data types, column types, column names, and values of constant columns. bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs); diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index b7ae64cfafc..40156ad5735 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -25,6 +25,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( : log(Logger::get(req_id)) , join(join_) , probe_process_info(max_block_size) + , join_finished(false) { children.push_back(input); @@ -67,17 +68,81 @@ Block HashJoinProbeBlockInputStream::getHeader() const Block HashJoinProbeBlockInputStream::readImpl() { - if (probe_process_info.all_rows_joined_finish) + // if join finished, return {} + if (join_finished) + { + return Block{}; + } + + Blocks result_blocks; + size_t output_rows = 0; + + // if over_limit_block is not null, we need to push it into result_blocks first. + if (over_limit_block) + { + result_blocks.push_back(over_limit_block); + output_rows += over_limit_block.rows(); + over_limit_block = Block{}; + } + + while (output_rows <= probe_process_info.max_block_size) + { + Block result_block = getOutputBlock(probe_process_info); + + if (!result_block) + { + // if result blocks is not empty, merge and return them, then mark join finished. + if (!result_blocks.empty()) + { + join_finished = true; + return mergeResultBlocks(std::move(result_blocks)); + } + // if result blocks is empty, return result block directly. + return result_block; + } + size_t current_rows = result_block.rows(); + + if (!output_rows || output_rows + current_rows <= probe_process_info.max_block_size) + { + result_blocks.push_back(result_block); + } + else + { + // if output_rows + current_rows > max block size, put the current result block into over_limit_block and handle it in next read. + over_limit_block = result_block; + } + output_rows += current_rows; + } + + return mergeResultBlocks(std::move(result_blocks)); +} + +Block HashJoinProbeBlockInputStream::getOutputBlock(ProbeProcessInfo & probe_process_info_) const +{ + if (probe_process_info_.all_rows_joined_finish) { Block block = children.back()->read(); if (!block) + { return block; + } join->checkTypes(block); - probe_process_info.resetBlock(std::move(block)); + probe_process_info_.resetBlock(std::move(block)); } - return join->joinBlock(probe_process_info); + return join->joinBlock(probe_process_info_); } +Block HashJoinProbeBlockInputStream::mergeResultBlocks(Blocks && result_blocks) +{ + if (result_blocks.size() == 1) + { + return result_blocks[0]; + } + else + { + return blocksMerge(std::move(result_blocks)); + } +} } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 3cc6fc4af6b..b8372c14f66 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -44,6 +44,8 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream String getName() const override { return name; } Block getTotals() override; Block getHeader() const override; + Block getOutputBlock(ProbeProcessInfo & probe_process_info) const; + static Block mergeResultBlocks(Blocks && result_blocks); protected: Block readImpl() override; @@ -52,6 +54,8 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream const LoggerPtr log; JoinPtr join; ProbeProcessInfo probe_process_info; + Block over_limit_block; + bool join_finished; }; } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_join_executor.cpp b/dbms/src/Flash/tests/gtest_join_executor.cpp index b7e3ff58683..e20a19b3174 100644 --- a/dbms/src/Flash/tests/gtest_join_executor.cpp +++ b/dbms/src/Flash/tests/gtest_join_executor.cpp @@ -700,7 +700,7 @@ CATCH TEST_F(JoinExecutorTestRunner, SplitJoinResult) try { - context.addMockTable("split_test", "t1", {{"a", TiDB::TP::TypeLong}}, {toVec("a", {1, 1, 1, 1, 1, 1, 1, 1, 1, 1})}); + context.addMockTable("split_test", "t1", {{"a", TiDB::TP::TypeLong}, {"b", TiDB::TP::TypeLong}}, {toVec("a", {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}), toVec("b", {1, 1, 3, 3, 1, 1, 3, 3, 1, 3})}); context.addMockTable("split_test", "t2", {{"a", TiDB::TP::TypeLong}}, {toVec("a", {1, 1, 1, 1, 1})}); auto request = context @@ -720,6 +720,25 @@ try ASSERT_EQ(expect[i][j], blocks[j].rows()); } } + + // with other condition + const auto cond = gt(col("b"), lit(Field(static_cast(2)))); + request = context + .scan("split_test", "t1") + .join(context.scan("split_test", "t2"), tipb::JoinType::TypeInnerJoin, {col("a")}, {}, {}, {cond}, {}) + + .build(context); + expect = {{5, 5, 5, 5, 5}, {5, 5, 5, 5, 5}, {5, 5, 5, 5, 5}, {25}, {25}, {25}, {25}, {25}}; + for (size_t i = 0; i < block_sizes.size(); ++i) + { + context.context.setSetting("max_block_size", Field(static_cast(block_sizes[i]))); + auto blocks = getExecuteStreamsReturnBlocks(request); + ASSERT_EQ(expect[i].size(), blocks.size()); + for (size_t j = 0; j < blocks.size(); ++j) + { + ASSERT_EQ(expect[i][j], blocks[j].rows()); + } + } } CATCH diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index ae86f78c1f4..9caae3269e0 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -120,7 +120,7 @@ class Join /** Join data from the map (that was previously built by calls to insertFromBlock) to the block with data from "left" table. * Could be called from different threads in parallel. */ - Block joinBlock(ProbeProcessInfo & probe_process_info) const; + Block joinBlock(ProbeProcessInfo & probe_process_info_) const; void checkTypes(const Block & block) const; From 4eeae4f8dd9df36954f510d411ac0a3fb2451b7d Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Thu, 22 Dec 2022 16:44:33 +0800 Subject: [PATCH 02/15] fix param name inconsistent --- dbms/src/DataStreams/HashJoinProbeBlockInputStream.h | 2 +- dbms/src/Interpreters/Join.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index b8372c14f66..eff2323f65f 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -44,7 +44,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream String getName() const override { return name; } Block getTotals() override; Block getHeader() const override; - Block getOutputBlock(ProbeProcessInfo & probe_process_info) const; + Block getOutputBlock(ProbeProcessInfo & probe_process_info_) const; static Block mergeResultBlocks(Blocks && result_blocks); protected: diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 9caae3269e0..ae86f78c1f4 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -120,7 +120,7 @@ class Join /** Join data from the map (that was previously built by calls to insertFromBlock) to the block with data from "left" table. * Could be called from different threads in parallel. */ - Block joinBlock(ProbeProcessInfo & probe_process_info_) const; + Block joinBlock(ProbeProcessInfo & probe_process_info) const; void checkTypes(const Block & block) const; From 5528dce98e173a566d7231390c90703ff054deb0 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Mon, 26 Dec 2022 10:58:30 +0800 Subject: [PATCH 03/15] modify according to comments --- dbms/src/Core/Block.cpp | 2 +- dbms/src/Core/Block.h | 2 +- dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp | 6 +++--- dbms/src/DataStreams/HashJoinProbeBlockInputStream.h | 1 + .../Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp | 4 ++-- dbms/src/TestUtils/ExecutorTestUtils.cpp | 6 +++--- dbms/src/TestUtils/ExecutorTestUtils.h | 2 +- dbms/src/TestUtils/MPPTaskTestUtils.cpp | 2 +- 8 files changed, 13 insertions(+), 12 deletions(-) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index b5ec40a1272..963b5bea4cd 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -514,7 +514,7 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons return ReturnType(true); } -Block blocksMerge(Blocks && result_blocks) +Block mergeBlocks(Blocks && result_blocks) { auto & sample_block = result_blocks[0]; MutableColumns dst_columns(sample_block.columns()); diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index c8c9c8aa865..4ee5eeeb9b5 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -158,7 +158,7 @@ class Block using Blocks = std::vector; using BlocksList = std::list; -Block blocksMerge(Blocks && result_blocks); +Block mergeBlocks(Blocks && result_blocks); /// Compare number of columns, data types, column types, column names, and values of constant columns. bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs); diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 40156ad5735..d7014a96739 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -74,14 +74,14 @@ Block HashJoinProbeBlockInputStream::readImpl() return Block{}; } - Blocks result_blocks; + result_blocks.clear(); size_t output_rows = 0; // if over_limit_block is not null, we need to push it into result_blocks first. if (over_limit_block) { - result_blocks.push_back(over_limit_block); output_rows += over_limit_block.rows(); + result_blocks.push_back(std::move(over_limit_block)); over_limit_block = Block{}; } @@ -141,7 +141,7 @@ Block HashJoinProbeBlockInputStream::mergeResultBlocks(Blocks && result_blocks) } else { - return blocksMerge(std::move(result_blocks)); + return mergeBlocks(std::move(result_blocks)); } } diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index eff2323f65f..9bff9b1beed 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -54,6 +54,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream const LoggerPtr log; JoinPtr join; ProbeProcessInfo probe_process_info; + Blocks result_blocks; Block over_limit_block; bool join_finished; }; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 617a1e7f13c..a890c868d0b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -301,7 +301,7 @@ Block SegmentTestBasic::prepareWriteBlock(Int64 start_key, Int64 end_key, bool i is_deleted); } -Block mergeBlocks(std::vector && blocks) +Block sortMergeBlocks(std::vector && blocks) { auto accumulated_block = std::move(blocks[0]); @@ -391,7 +391,7 @@ Block SegmentTestBasic::prepareWriteBlockInSegmentRange(PageId segment_id, UInt6 remaining_rows); } - return mergeBlocks(std::move(blocks)); + return sortMergeBlocks(std::move(blocks)); } void SegmentTestBasic::writeSegment(PageId segment_id, UInt64 write_rows, std::optional start_at) diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 90fa363fe4d..35099474a73 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -166,7 +166,7 @@ void ExecutorTest::executeAndAssertRowsEqual(const std::shared_ptr streams) Blocks actual_blocks; for (const auto & stream : streams) readStream(actual_blocks, stream); - return mergeBlocks(actual_blocks).getColumnsWithTypeAndName(); + return mergeBlocksForTest(actual_blocks).getColumnsWithTypeAndName(); } void ExecutorTest::enablePlanner(bool is_enable) @@ -231,7 +231,7 @@ DB::ColumnsWithTypeAndName ExecutorTest::executeStreams(const std::shared_ptrexecute([&blocks](const Block & block) { blocks.push_back(block); }).verify(); - return mergeBlocks(blocks).getColumnsWithTypeAndName(); + return mergeBlocksForTest(blocks).getColumnsWithTypeAndName(); } Blocks ExecutorTest::getExecuteStreamsReturnBlocks(const std::shared_ptr & request, size_t concurrency) diff --git a/dbms/src/TestUtils/ExecutorTestUtils.h b/dbms/src/TestUtils/ExecutorTestUtils.h index acb239a96df..06f66644724 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.h +++ b/dbms/src/TestUtils/ExecutorTestUtils.h @@ -30,7 +30,7 @@ TiDB::TP dataTypeToTP(const DataTypePtr & type); ColumnsWithTypeAndName readBlock(BlockInputStreamPtr stream); ColumnsWithTypeAndName readBlocks(std::vector streams); -Block mergeBlocks(Blocks blocks); +Block mergeBlocksForTest(Blocks blocks); #define WRAP_FOR_DIS_ENABLE_PLANNER_BEGIN \ diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index 6d7275a19a0..25fb4f70383 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -93,7 +93,7 @@ ColumnsWithTypeAndName extractColumns(Context & context, const std::shared_ptrchunks()) blocks.emplace_back(codec->decode(chunk.rows_data(), schema)); - return mergeBlocks(blocks).getColumnsWithTypeAndName(); + return mergeBlocksForTest(blocks).getColumnsWithTypeAndName(); } ColumnsWithTypeAndName MPPTaskTestUtils::executeCoprocessorTask(std::shared_ptr & dag_request) From 17c3a48f89a7e69556a5a094e7e28e3810d5223f Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Mon, 26 Dec 2022 12:04:35 +0800 Subject: [PATCH 04/15] change param name --- dbms/src/Core/Block.cpp | 6 +++--- dbms/src/Core/Block.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 963b5bea4cd..a6d96153090 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -514,15 +514,15 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons return ReturnType(true); } -Block mergeBlocks(Blocks && result_blocks) +Block mergeBlocks(Blocks && blocks) { - auto & sample_block = result_blocks[0]; + auto & sample_block = blocks[0]; MutableColumns dst_columns(sample_block.columns()); for (size_t i = 0; i < sample_block.columns(); i++) { dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty(); } - for (auto & current_block : result_blocks) + for (auto & current_block : blocks) { if (current_block.rows() > 0) { diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index 4ee5eeeb9b5..5f2cabe7859 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -158,7 +158,7 @@ class Block using Blocks = std::vector; using BlocksList = std::list; -Block mergeBlocks(Blocks && result_blocks); +Block mergeBlocks(Blocks && blocks); /// Compare number of columns, data types, column types, column names, and values of constant columns. bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs); From 2859767aa30f68bef085b1ad635bb53b64e1afc4 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Mon, 26 Dec 2022 12:09:35 +0800 Subject: [PATCH 05/15] add empty check in mergeBlocks function. --- dbms/src/Core/Block.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index a6d96153090..41102b576c3 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -516,6 +516,7 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons Block mergeBlocks(Blocks && blocks) { + assert(!blocks.empty()); auto & sample_block = blocks[0]; MutableColumns dst_columns(sample_block.columns()); for (size_t i = 0; i < sample_block.columns(); i++) From a3c0cd37cc11fa92b90cd37422ab91adede80773 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Mon, 26 Dec 2022 13:07:59 +0800 Subject: [PATCH 06/15] modify according to comments --- dbms/src/Core/Block.cpp | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 41102b576c3..8ac27aed883 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -518,18 +518,27 @@ Block mergeBlocks(Blocks && blocks) { assert(!blocks.empty()); auto & sample_block = blocks[0]; + size_t result_rows = 0; + for (const auto & block : blocks) + { + result_rows += block.rows(); + } + MutableColumns dst_columns(sample_block.columns()); + for (size_t i = 0; i < sample_block.columns(); i++) { - dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty(); + dst_columns[i] = (*std::move(sample_block.getByPosition(i).column)).mutate(); + dst_columns[i]->reserve(result_rows); } - for (auto & current_block : blocks) + + for (size_t i = 1; i < blocks.size(); ++i) { - if (current_block.rows() > 0) + if (blocks[i].rows() > 0) { - for (size_t column = 0; column < current_block.columns(); column++) + for (size_t column = 0; column < blocks[i].columns(); column++) { - dst_columns[column]->insertRangeFrom(*current_block.getByPosition(column).column, 0, current_block.rows()); + dst_columns[column]->insertRangeFrom(*blocks[i].getByPosition(column).column, 0, blocks[i].rows()); } } } From 7ae36dc6126934dbb76fc9ed2025a27f5a50fd04 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Mon, 26 Dec 2022 13:25:32 +0800 Subject: [PATCH 07/15] change param name --- dbms/src/Core/Block.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 8ac27aed883..9c8dd8ab74c 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -517,18 +517,18 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons Block mergeBlocks(Blocks && blocks) { assert(!blocks.empty()); - auto & sample_block = blocks[0]; + auto & first_block = blocks[0]; size_t result_rows = 0; for (const auto & block : blocks) { result_rows += block.rows(); } - MutableColumns dst_columns(sample_block.columns()); + MutableColumns dst_columns(first_block.columns()); - for (size_t i = 0; i < sample_block.columns(); i++) + for (size_t i = 0; i < first_block.columns(); i++) { - dst_columns[i] = (*std::move(sample_block.getByPosition(i).column)).mutate(); + dst_columns[i] = (*std::move(first_block.getByPosition(i).column)).mutate(); dst_columns[i]->reserve(result_rows); } @@ -542,7 +542,7 @@ Block mergeBlocks(Blocks && blocks) } } } - return sample_block.cloneWithColumns(std::move(dst_columns)); + return first_block.cloneWithColumns(std::move(dst_columns)); } bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs) From 14d449ea89fd282bb1333972491585f7d48b49fb Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Mon, 26 Dec 2022 15:58:21 +0800 Subject: [PATCH 08/15] modify according to comments --- .../HashJoinProbeBlockInputStream.cpp | 116 +++++++++++------- .../HashJoinProbeBlockInputStream.h | 22 +++- 2 files changed, 92 insertions(+), 46 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index d7014a96739..1578cacc605 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -25,7 +25,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream( : log(Logger::get(req_id)) , join(join_) , probe_process_info(max_block_size) - , join_finished(false) + , squashing_transform(max_block_size) { children.push_back(input); @@ -68,53 +68,20 @@ Block HashJoinProbeBlockInputStream::getHeader() const Block HashJoinProbeBlockInputStream::readImpl() { - // if join finished, return {} - if (join_finished) + // if join finished, return {} directly. + if (squashing_transform.isJoinFinished()) { return Block{}; } - result_blocks.clear(); - size_t output_rows = 0; - - // if over_limit_block is not null, we need to push it into result_blocks first. - if (over_limit_block) - { - output_rows += over_limit_block.rows(); - result_blocks.push_back(std::move(over_limit_block)); - over_limit_block = Block{}; - } + squashing_transform.handleOverLimitBlock(); - while (output_rows <= probe_process_info.max_block_size) + while (squashing_transform.needAppendBlock()) { Block result_block = getOutputBlock(probe_process_info); - - if (!result_block) - { - // if result blocks is not empty, merge and return them, then mark join finished. - if (!result_blocks.empty()) - { - join_finished = true; - return mergeResultBlocks(std::move(result_blocks)); - } - // if result blocks is empty, return result block directly. - return result_block; - } - size_t current_rows = result_block.rows(); - - if (!output_rows || output_rows + current_rows <= probe_process_info.max_block_size) - { - result_blocks.push_back(result_block); - } - else - { - // if output_rows + current_rows > max block size, put the current result block into over_limit_block and handle it in next read. - over_limit_block = result_block; - } - output_rows += current_rows; + squashing_transform.appendBlock(result_block); } - - return mergeResultBlocks(std::move(result_blocks)); + return squashing_transform.getFinalOutputBlock(); } Block HashJoinProbeBlockInputStream::getOutputBlock(ProbeProcessInfo & probe_process_info_) const @@ -133,16 +100,77 @@ Block HashJoinProbeBlockInputStream::getOutputBlock(ProbeProcessInfo & probe_pro return join->joinBlock(probe_process_info_); } -Block HashJoinProbeBlockInputStream::mergeResultBlocks(Blocks && result_blocks) +SquashingHashJoinBlockTransform::SquashingHashJoinBlockTransform(UInt64 max_block_size_) + : output_rows(0) + , max_block_size(max_block_size_) + , join_finished(false) +{} + +void SquashingHashJoinBlockTransform::handleOverLimitBlock() +{ + // we need to reset squash before handle over limit block; + reset(); + // if over_limit_block is not null, we need to push it into blocks first. + if (over_limit_block) + { + output_rows += over_limit_block.rows(); + blocks.push_back(std::move(over_limit_block)); + over_limit_block = Block{}; + } +} + +void SquashingHashJoinBlockTransform::appendBlock(Block block) { - if (result_blocks.size() == 1) + if (!block) { - return result_blocks[0]; + // if append block is {}, mark join finished. + join_finished = true; + return; + } + size_t current_rows = block.rows(); + + if (!output_rows || output_rows + current_rows <= max_block_size) + { + blocks.push_back(block); } else { - return mergeBlocks(std::move(result_blocks)); + // if output_rows + current_rows > max block size, put the current result block into over_limit_block and handle it in next read. + over_limit_block = block; + } + output_rows += current_rows; +} + +Block SquashingHashJoinBlockTransform::getFinalOutputBlock() +{ + if (blocks.empty()) + { + return {}; + } + else if (blocks.size() == 1) + { + return blocks[0]; } + else + { + return mergeBlocks(std::move(blocks)); + } +} + +void SquashingHashJoinBlockTransform::reset() +{ + blocks.clear(); + output_rows = 0; +} + +bool SquashingHashJoinBlockTransform::isJoinFinished() const +{ + return join_finished; +} + +bool SquashingHashJoinBlockTransform::needAppendBlock() const +{ + return output_rows <= max_block_size && !join_finished; } } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 9bff9b1beed..6567e6d7ca1 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -20,6 +20,25 @@ namespace DB { +class SquashingHashJoinBlockTransform +{ +public: + SquashingHashJoinBlockTransform(UInt64 max_block_size_); + + void handleOverLimitBlock(); + void appendBlock(Block block); + Block getFinalOutputBlock(); + void reset(); + bool isJoinFinished() const; + bool needAppendBlock() const; + +private: + Blocks blocks; + Block over_limit_block; + size_t output_rows; + UInt64 max_block_size; + bool join_finished; +}; /** Executes a certain expression over the block. * Basically the same as ExpressionBlockInputStream, @@ -45,7 +64,6 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream Block getTotals() override; Block getHeader() const override; Block getOutputBlock(ProbeProcessInfo & probe_process_info_) const; - static Block mergeResultBlocks(Blocks && result_blocks); protected: Block readImpl() override; @@ -56,7 +74,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream ProbeProcessInfo probe_process_info; Blocks result_blocks; Block over_limit_block; - bool join_finished; + SquashingHashJoinBlockTransform squashing_transform; }; } // namespace DB From 23ec73d22fd30f83b35f578ad0eac8fd313d3589 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Tue, 27 Dec 2022 10:17:41 +0800 Subject: [PATCH 09/15] modify according to comments --- dbms/src/Core/Block.cpp | 4 +- .../HashJoinProbeBlockInputStream.cpp | 76 -------------- .../HashJoinProbeBlockInputStream.h | 21 +--- .../SquashingHashJoinBlockTransform.cpp | 98 +++++++++++++++++++ .../SquashingHashJoinBlockTransform.h | 46 +++++++++ .../gtest_squashing_hash_join_transform.cpp | 82 ++++++++++++++++ 6 files changed, 229 insertions(+), 98 deletions(-) create mode 100644 dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp create mode 100644 dbms/src/DataStreams/SquashingHashJoinBlockTransform.h create mode 100644 dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 9c8dd8ab74c..e9c7f1a5697 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -534,9 +534,9 @@ Block mergeBlocks(Blocks && blocks) for (size_t i = 1; i < blocks.size(); ++i) { - if (blocks[i].rows() > 0) + if (likely(blocks[i].rows()) > 0) { - for (size_t column = 0; column < blocks[i].columns(); column++) + for (size_t column = 0; column < blocks[i].columns(); ++column) { dst_columns[column]->insertRangeFrom(*blocks[i].getByPosition(column).column, 0, blocks[i].rows()); } diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 1578cacc605..9bdf86e7175 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -13,7 +13,6 @@ // limitations under the License. #include -#include namespace DB { @@ -74,8 +73,6 @@ Block HashJoinProbeBlockInputStream::readImpl() return Block{}; } - squashing_transform.handleOverLimitBlock(); - while (squashing_transform.needAppendBlock()) { Block result_block = getOutputBlock(probe_process_info); @@ -100,77 +97,4 @@ Block HashJoinProbeBlockInputStream::getOutputBlock(ProbeProcessInfo & probe_pro return join->joinBlock(probe_process_info_); } -SquashingHashJoinBlockTransform::SquashingHashJoinBlockTransform(UInt64 max_block_size_) - : output_rows(0) - , max_block_size(max_block_size_) - , join_finished(false) -{} - -void SquashingHashJoinBlockTransform::handleOverLimitBlock() -{ - // we need to reset squash before handle over limit block; - reset(); - // if over_limit_block is not null, we need to push it into blocks first. - if (over_limit_block) - { - output_rows += over_limit_block.rows(); - blocks.push_back(std::move(over_limit_block)); - over_limit_block = Block{}; - } -} - -void SquashingHashJoinBlockTransform::appendBlock(Block block) -{ - if (!block) - { - // if append block is {}, mark join finished. - join_finished = true; - return; - } - size_t current_rows = block.rows(); - - if (!output_rows || output_rows + current_rows <= max_block_size) - { - blocks.push_back(block); - } - else - { - // if output_rows + current_rows > max block size, put the current result block into over_limit_block and handle it in next read. - over_limit_block = block; - } - output_rows += current_rows; -} - -Block SquashingHashJoinBlockTransform::getFinalOutputBlock() -{ - if (blocks.empty()) - { - return {}; - } - else if (blocks.size() == 1) - { - return blocks[0]; - } - else - { - return mergeBlocks(std::move(blocks)); - } -} - -void SquashingHashJoinBlockTransform::reset() -{ - blocks.clear(); - output_rows = 0; -} - -bool SquashingHashJoinBlockTransform::isJoinFinished() const -{ - return join_finished; -} - -bool SquashingHashJoinBlockTransform::needAppendBlock() const -{ - return output_rows <= max_block_size && !join_finished; -} - } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 6567e6d7ca1..82fe885b83f 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -15,31 +15,12 @@ #pragma once #include +#include #include namespace DB { -class SquashingHashJoinBlockTransform -{ -public: - SquashingHashJoinBlockTransform(UInt64 max_block_size_); - - void handleOverLimitBlock(); - void appendBlock(Block block); - Block getFinalOutputBlock(); - void reset(); - bool isJoinFinished() const; - bool needAppendBlock() const; - -private: - Blocks blocks; - Block over_limit_block; - size_t output_rows; - UInt64 max_block_size; - bool join_finished; -}; - /** Executes a certain expression over the block. * Basically the same as ExpressionBlockInputStream, * but requires that there must be a join probe action in the Expression. diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp new file mode 100644 index 00000000000..e6705b393d7 --- /dev/null +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp @@ -0,0 +1,98 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ + +SquashingHashJoinBlockTransform::SquashingHashJoinBlockTransform(UInt64 max_block_size_) + : output_rows(0) + , max_block_size(max_block_size_) + , join_finished(false) + , over_limit(false) +{} + +void SquashingHashJoinBlockTransform::handleOverLimitBlock() +{ + // if over_limit_block is not null, we need to push it into blocks first. + if (over_limit_block.has_value()) + { + output_rows += over_limit_block->rows(); + blocks.push_back(std::move(over_limit_block.value())); + over_limit_block = {}; + } +} + +void SquashingHashJoinBlockTransform::appendBlock(Block block) +{ + handleOverLimitBlock(); + if (!block) + { + // if append block is {}, mark join finished. + join_finished = true; + return; + } + size_t current_rows = block.rows(); + + over_limit = output_rows && output_rows + current_rows > max_block_size; + if (!over_limit) + { + blocks.push_back(block); + output_rows += current_rows; + } + else + { + // if output_rows + current_rows > max block size, put the current result block into over_limit_block and handle it in next read. + over_limit_block = block; + } +} + +Block SquashingHashJoinBlockTransform::getFinalOutputBlock() +{ + Block final_block; + if (blocks.empty()) + { + final_block = {}; + } + else if (blocks.size() == 1) + { + final_block = blocks[0]; + } + else + { + final_block = mergeBlocks(std::move(blocks)); + } + reset(); + return final_block; +} + +void SquashingHashJoinBlockTransform::reset() +{ + blocks.clear(); + output_rows = 0; + over_limit = false; +} + +bool SquashingHashJoinBlockTransform::isJoinFinished() const +{ + return join_finished; +} + +bool SquashingHashJoinBlockTransform::needAppendBlock() const +{ + return !over_limit && !join_finished; +} + +} // namespace DB diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h new file mode 100644 index 00000000000..efa8f18212a --- /dev/null +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h @@ -0,0 +1,46 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ + +class SquashingHashJoinBlockTransform +{ +public: + SquashingHashJoinBlockTransform(UInt64 max_block_size_); + + void handleOverLimitBlock(); + void appendBlock(Block block); + Block getFinalOutputBlock(); + void reset(); + bool isJoinFinished() const; + bool needAppendBlock() const; + + +#ifndef DBMS_PUBLIC_GTEST +private: +#endif + Blocks blocks; + std::optional over_limit_block; + size_t output_rows; + UInt64 max_block_size; + bool join_finished; + bool over_limit; +}; + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp b/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp new file mode 100644 index 00000000000..604b93f72d5 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp @@ -0,0 +1,82 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + + +namespace DB +{ +namespace tests +{ +class SquashingHashJoinBlockTransformTest : public ::testing::Test +{ +public: + void SetUp() override {} + static ColumnWithTypeAndName toVec(const std::vector & v) + { + return createColumn(v); + } + + static void check(Blocks blocks, UInt64 max_block_size) + { + for (size_t i = 0; i < blocks.size(); ++i) + { + if (i == (blocks.size() - 1)) + { + ASSERT(blocks[i].rows() <= max_block_size); + break; + } + ASSERT(blocks[i].rows() == max_block_size); + } + } +}; + +TEST_F(SquashingHashJoinBlockTransformTest, testALL) +try +{ + Int64 rows = 30000; + Blocks test_blocks; + for (Int64 i = 0; i < rows; ++i) + { + Block block{toVec({i})}; + test_blocks.push_back(block); + } + test_blocks.push_back(Block{}); + + std::vector block_size{1, 5, 10, 99, 999, 9999, 39999, DEFAULT_BLOCK_SIZE}; + + for (auto size : block_size) + { + Blocks final_blocks; + size_t index = 0; + SquashingHashJoinBlockTransform squashing_transform(size); + while (!squashing_transform.isJoinFinished()) + { + while (squashing_transform.needAppendBlock()) + { + Block result_block = test_blocks[index++]; + squashing_transform.appendBlock(result_block); + } + final_blocks.push_back(squashing_transform.getFinalOutputBlock()); + } + check(final_blocks, std::min(size, rows)); + } +} +CATCH + +} // namespace tests +} // namespace DB \ No newline at end of file From 37697bb0537340731a5d96fd3e60a660cfffa98b Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Tue, 27 Dec 2022 14:15:33 +0800 Subject: [PATCH 10/15] modify according to comments --- dbms/src/Core/Block.cpp | 7 ++-- .../HashJoinProbeBlockInputStream.h | 2 -- .../SquashingHashJoinBlockTransform.cpp | 14 ++++---- .../SquashingHashJoinBlockTransform.h | 7 ++-- .../gtest_squashing_hash_join_transform.cpp | 9 ++++-- dbms/src/TestUtils/ExecutorTestUtils.cpp | 32 ++----------------- dbms/src/TestUtils/ExecutorTestUtils.h | 2 -- dbms/src/TestUtils/MPPTaskTestUtils.cpp | 2 +- 8 files changed, 23 insertions(+), 52 deletions(-) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index e9c7f1a5697..a47fce7b373 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -516,7 +516,10 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons Block mergeBlocks(Blocks && blocks) { - assert(!blocks.empty()); + if (blocks.empty()) + { + return {}; + } auto & first_block = blocks[0]; size_t result_rows = 0; for (const auto & block : blocks) @@ -526,7 +529,7 @@ Block mergeBlocks(Blocks && blocks) MutableColumns dst_columns(first_block.columns()); - for (size_t i = 0; i < first_block.columns(); i++) + for (size_t i = 0; i < first_block.columns(); ++i) { dst_columns[i] = (*std::move(first_block.getByPosition(i).column)).mutate(); dst_columns[i]->reserve(result_rows); diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index 82fe885b83f..bb90f4bc190 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -53,8 +53,6 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream const LoggerPtr log; JoinPtr join; ProbeProcessInfo probe_process_info; - Blocks result_blocks; - Block over_limit_block; SquashingHashJoinBlockTransform squashing_transform; }; diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp index e6705b393d7..db7b6651375 100644 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp @@ -26,9 +26,10 @@ SquashingHashJoinBlockTransform::SquashingHashJoinBlockTransform(UInt64 max_bloc void SquashingHashJoinBlockTransform::handleOverLimitBlock() { - // if over_limit_block is not null, we need to push it into blocks first. + // if over_limit_block is not null, we need to push it into blocks. if (over_limit_block.has_value()) { + assert(!(output_rows && blocks.empty())); output_rows += over_limit_block->rows(); blocks.push_back(std::move(over_limit_block.value())); over_limit_block = {}; @@ -37,7 +38,6 @@ void SquashingHashJoinBlockTransform::handleOverLimitBlock() void SquashingHashJoinBlockTransform::appendBlock(Block block) { - handleOverLimitBlock(); if (!block) { // if append block is {}, mark join finished. @@ -62,19 +62,17 @@ void SquashingHashJoinBlockTransform::appendBlock(Block block) Block SquashingHashJoinBlockTransform::getFinalOutputBlock() { Block final_block; - if (blocks.empty()) - { - final_block = {}; - } - else if (blocks.size() == 1) + + if (blocks.size() == 1) { - final_block = blocks[0]; + final_block = std::move(blocks[0]); } else { final_block = mergeBlocks(std::move(blocks)); } reset(); + handleOverLimitBlock(); return final_block; } diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h index efa8f18212a..34a9dd0a0e4 100644 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h @@ -24,17 +24,16 @@ class SquashingHashJoinBlockTransform public: SquashingHashJoinBlockTransform(UInt64 max_block_size_); - void handleOverLimitBlock(); void appendBlock(Block block); Block getFinalOutputBlock(); - void reset(); bool isJoinFinished() const; bool needAppendBlock() const; -#ifndef DBMS_PUBLIC_GTEST private: -#endif + void handleOverLimitBlock(); + void reset(); + Blocks blocks; std::optional over_limit_block; size_t output_rows; diff --git a/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp b/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp index 604b93f72d5..483a6e9d7ae 100644 --- a/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp +++ b/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp @@ -48,9 +48,9 @@ class SquashingHashJoinBlockTransformTest : public ::testing::Test TEST_F(SquashingHashJoinBlockTransformTest, testALL) try { - Int64 rows = 30000; + Int64 expect_rows = 30000; Blocks test_blocks; - for (Int64 i = 0; i < rows; ++i) + for (Int64 i = 0; i < expect_rows; ++i) { Block block{toVec({i})}; test_blocks.push_back(block); @@ -63,6 +63,7 @@ try { Blocks final_blocks; size_t index = 0; + Int64 actual_rows = 0; SquashingHashJoinBlockTransform squashing_transform(size); while (!squashing_transform.isJoinFinished()) { @@ -72,8 +73,10 @@ try squashing_transform.appendBlock(result_block); } final_blocks.push_back(squashing_transform.getFinalOutputBlock()); + actual_rows += final_blocks.back().rows(); } - check(final_blocks, std::min(size, rows)); + check(final_blocks, std::min(size, expect_rows)); + ASSERT(actual_rows == expect_rows); } } CATCH diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 35099474a73..6becfe3d74d 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -166,34 +166,6 @@ void ExecutorTest::executeAndAssertRowsEqual(const std::shared_ptr actual_cols; - for (const auto & column : sample_block.getColumnsWithTypeAndName()) - { - actual_cols.push_back(column.type->createColumn()); - } - for (const auto & block : blocks) - { - for (size_t i = 0; i < block.columns(); ++i) - { - for (size_t j = 0; j < block.rows(); ++j) - { - actual_cols[i]->insert((*(block.getColumnsWithTypeAndName())[i].column)[j]); - } - } - } - - ColumnsWithTypeAndName actual_columns; - for (size_t i = 0; i < actual_cols.size(); ++i) - actual_columns.push_back({std::move(actual_cols[i]), sample_block.getColumnsWithTypeAndName()[i].type, sample_block.getColumnsWithTypeAndName()[i].name, sample_block.getColumnsWithTypeAndName()[i].column_id}); - return Block(actual_columns); -} - void readStream(Blocks & blocks, BlockInputStreamPtr stream) { stream->readPrefix(); @@ -214,7 +186,7 @@ DB::ColumnsWithTypeAndName readBlocks(std::vector streams) Blocks actual_blocks; for (const auto & stream : streams) readStream(actual_blocks, stream); - return mergeBlocksForTest(actual_blocks).getColumnsWithTypeAndName(); + return mergeBlocks(std::move(actual_blocks)).getColumnsWithTypeAndName(); } void ExecutorTest::enablePlanner(bool is_enable) @@ -231,7 +203,7 @@ DB::ColumnsWithTypeAndName ExecutorTest::executeStreams(const std::shared_ptrexecute([&blocks](const Block & block) { blocks.push_back(block); }).verify(); - return mergeBlocksForTest(blocks).getColumnsWithTypeAndName(); + return mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName(); } Blocks ExecutorTest::getExecuteStreamsReturnBlocks(const std::shared_ptr & request, size_t concurrency) diff --git a/dbms/src/TestUtils/ExecutorTestUtils.h b/dbms/src/TestUtils/ExecutorTestUtils.h index 06f66644724..fe00d14608b 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.h +++ b/dbms/src/TestUtils/ExecutorTestUtils.h @@ -30,8 +30,6 @@ TiDB::TP dataTypeToTP(const DataTypePtr & type); ColumnsWithTypeAndName readBlock(BlockInputStreamPtr stream); ColumnsWithTypeAndName readBlocks(std::vector streams); -Block mergeBlocksForTest(Blocks blocks); - #define WRAP_FOR_DIS_ENABLE_PLANNER_BEGIN \ std::vector bools{false, true}; \ diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index 25fb4f70383..143a9a78034 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -93,7 +93,7 @@ ColumnsWithTypeAndName extractColumns(Context & context, const std::shared_ptrchunks()) blocks.emplace_back(codec->decode(chunk.rows_data(), schema)); - return mergeBlocksForTest(blocks).getColumnsWithTypeAndName(); + return mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName(); } ColumnsWithTypeAndName MPPTaskTestUtils::executeCoprocessorTask(std::shared_ptr & dag_request) From 2ff245128d1d4847d6d5e7e8396b26d0bad290d5 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Tue, 27 Dec 2022 14:29:54 +0800 Subject: [PATCH 11/15] modify according to comments --- dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp index db7b6651375..d5f338653fa 100644 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp @@ -32,7 +32,7 @@ void SquashingHashJoinBlockTransform::handleOverLimitBlock() assert(!(output_rows && blocks.empty())); output_rows += over_limit_block->rows(); blocks.push_back(std::move(over_limit_block.value())); - over_limit_block = {}; + over_limit_block.reset(); } } From 6a1e16730987be0ae85aef7262c030248fccc5d3 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Tue, 27 Dec 2022 14:32:12 +0800 Subject: [PATCH 12/15] modify according to comments --- dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp | 2 +- dbms/src/DataStreams/SquashingHashJoinBlockTransform.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp index d5f338653fa..cc8e23266d4 100644 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp @@ -36,7 +36,7 @@ void SquashingHashJoinBlockTransform::handleOverLimitBlock() } } -void SquashingHashJoinBlockTransform::appendBlock(Block block) +void SquashingHashJoinBlockTransform::appendBlock(Block & block) { if (!block) { diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h index 34a9dd0a0e4..14ca039e03e 100644 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h @@ -24,7 +24,7 @@ class SquashingHashJoinBlockTransform public: SquashingHashJoinBlockTransform(UInt64 max_block_size_); - void appendBlock(Block block); + void appendBlock(Block & block); Block getFinalOutputBlock(); bool isJoinFinished() const; bool needAppendBlock() const; From 30887f294df03e927ce867fc8693bac7cecec618 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Tue, 27 Dec 2022 14:55:24 +0800 Subject: [PATCH 13/15] modify according to comments --- .../DataStreams/HashJoinProbeBlockInputStream.cpp | 10 +++++----- .../DataStreams/HashJoinProbeBlockInputStream.h | 2 +- .../SquashingHashJoinBlockTransform.cpp | 14 ++++++-------- .../DataStreams/SquashingHashJoinBlockTransform.h | 1 - 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index 9bdf86e7175..2fd304b162f 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -75,15 +75,15 @@ Block HashJoinProbeBlockInputStream::readImpl() while (squashing_transform.needAppendBlock()) { - Block result_block = getOutputBlock(probe_process_info); + Block result_block = getOutputBlock(); squashing_transform.appendBlock(result_block); } return squashing_transform.getFinalOutputBlock(); } -Block HashJoinProbeBlockInputStream::getOutputBlock(ProbeProcessInfo & probe_process_info_) const +Block HashJoinProbeBlockInputStream::getOutputBlock() { - if (probe_process_info_.all_rows_joined_finish) + if (probe_process_info.all_rows_joined_finish) { Block block = children.back()->read(); if (!block) @@ -91,10 +91,10 @@ Block HashJoinProbeBlockInputStream::getOutputBlock(ProbeProcessInfo & probe_pro return block; } join->checkTypes(block); - probe_process_info_.resetBlock(std::move(block)); + probe_process_info.resetBlock(std::move(block)); } - return join->joinBlock(probe_process_info_); + return join->joinBlock(probe_process_info); } } // namespace DB diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index bb90f4bc190..a3c7b12fce3 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -44,7 +44,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream String getName() const override { return name; } Block getTotals() override; Block getHeader() const override; - Block getOutputBlock(ProbeProcessInfo & probe_process_info_) const; + Block getOutputBlock(); protected: Block readImpl() override; diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp index cc8e23266d4..32e8b57dd2c 100644 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp @@ -21,13 +21,12 @@ SquashingHashJoinBlockTransform::SquashingHashJoinBlockTransform(UInt64 max_bloc : output_rows(0) , max_block_size(max_block_size_) , join_finished(false) - , over_limit(false) {} void SquashingHashJoinBlockTransform::handleOverLimitBlock() { // if over_limit_block is not null, we need to push it into blocks. - if (over_limit_block.has_value()) + if (over_limit_block) { assert(!(output_rows && blocks.empty())); output_rows += over_limit_block->rows(); @@ -46,16 +45,16 @@ void SquashingHashJoinBlockTransform::appendBlock(Block & block) } size_t current_rows = block.rows(); - over_limit = output_rows && output_rows + current_rows > max_block_size; - if (!over_limit) + if (!output_rows || output_rows + current_rows <= max_block_size) { - blocks.push_back(block); + blocks.push_back(std::move(block)); output_rows += current_rows; } else { // if output_rows + current_rows > max block size, put the current result block into over_limit_block and handle it in next read. - over_limit_block = block; + assert(!over_limit_block); + over_limit_block.emplace(std::move(block)); } } @@ -80,7 +79,6 @@ void SquashingHashJoinBlockTransform::reset() { blocks.clear(); output_rows = 0; - over_limit = false; } bool SquashingHashJoinBlockTransform::isJoinFinished() const @@ -90,7 +88,7 @@ bool SquashingHashJoinBlockTransform::isJoinFinished() const bool SquashingHashJoinBlockTransform::needAppendBlock() const { - return !over_limit && !join_finished; + return !over_limit_block && !join_finished; } } // namespace DB diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h index 14ca039e03e..956dac0903f 100644 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h @@ -39,7 +39,6 @@ class SquashingHashJoinBlockTransform size_t output_rows; UInt64 max_block_size; bool join_finished; - bool over_limit; }; } // namespace DB \ No newline at end of file From 282de941655db3f8197e7208c2d22e873c005737 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Tue, 27 Dec 2022 15:41:55 +0800 Subject: [PATCH 14/15] modify according to comments --- dbms/src/Core/Block.cpp | 6 ++++ .../HashJoinProbeBlockInputStream.h | 2 +- .../SquashingHashJoinBlockTransform.cpp | 11 +----- .../gtest_squashing_hash_join_transform.cpp | 34 +++++++++++-------- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index a47fce7b373..b8adade5a84 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -520,6 +520,12 @@ Block mergeBlocks(Blocks && blocks) { return {}; } + + if (blocks.size() == 1) + { + return std::move(blocks[0]); + } + auto & first_block = blocks[0]; size_t result_rows = 0; for (const auto & block : blocks) diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h index a3c7b12fce3..cf6e557d32c 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.h @@ -44,10 +44,10 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream String getName() const override { return name; } Block getTotals() override; Block getHeader() const override; - Block getOutputBlock(); protected: Block readImpl() override; + Block getOutputBlock(); private: const LoggerPtr log; diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp index 32e8b57dd2c..9c876d7883d 100644 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp +++ b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp @@ -60,16 +60,7 @@ void SquashingHashJoinBlockTransform::appendBlock(Block & block) Block SquashingHashJoinBlockTransform::getFinalOutputBlock() { - Block final_block; - - if (blocks.size() == 1) - { - final_block = std::move(blocks[0]); - } - else - { - final_block = mergeBlocks(std::move(blocks)); - } + Block final_block = mergeBlocks(std::move(blocks)); reset(); handleOverLimitBlock(); return final_block; diff --git a/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp b/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp index 483a6e9d7ae..1f61878da48 100644 --- a/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp +++ b/dbms/src/Flash/tests/gtest_squashing_hash_join_transform.cpp @@ -35,12 +35,7 @@ class SquashingHashJoinBlockTransformTest : public ::testing::Test { for (size_t i = 0; i < blocks.size(); ++i) { - if (i == (blocks.size() - 1)) - { - ASSERT(blocks[i].rows() <= max_block_size); - break; - } - ASSERT(blocks[i].rows() == max_block_size); + ASSERT(blocks[i].rows() <= max_block_size); } } }; @@ -48,19 +43,28 @@ class SquashingHashJoinBlockTransformTest : public ::testing::Test TEST_F(SquashingHashJoinBlockTransformTest, testALL) try { - Int64 expect_rows = 30000; - Blocks test_blocks; - for (Int64 i = 0; i < expect_rows; ++i) - { - Block block{toVec({i})}; - test_blocks.push_back(block); - } - test_blocks.push_back(Block{}); - std::vector block_size{1, 5, 10, 99, 999, 9999, 39999, DEFAULT_BLOCK_SIZE}; + size_t merge_block_count = 10000; for (auto size : block_size) { + Int64 expect_rows = 0; + Blocks test_blocks; + + for (size_t i = 0; i < merge_block_count; ++i) + { + size_t rand_block_size = std::rand() % size + 1; + expect_rows += rand_block_size; + std::vector values; + for (size_t j = 0; j < rand_block_size; ++j) + { + values.push_back(1); + } + Block block{toVec(values)}; + test_blocks.push_back(block); + } + test_blocks.push_back(Block{}); + Blocks final_blocks; size_t index = 0; Int64 actual_rows = 0; From 61375307bb41a6251c5b742ce8564f24de8d21dd Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Tue, 27 Dec 2022 22:10:57 +0800 Subject: [PATCH 15/15] little change --- dbms/src/TestUtils/ExecutorTestUtils.cpp | 32 ++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 6becfe3d74d..c4f8999f9e1 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -166,6 +166,34 @@ void ExecutorTest::executeAndAssertRowsEqual(const std::shared_ptr actual_cols; + for (const auto & column : sample_block.getColumnsWithTypeAndName()) + { + actual_cols.push_back(column.type->createColumn()); + } + for (const auto & block : blocks) + { + for (size_t i = 0; i < block.columns(); ++i) + { + for (size_t j = 0; j < block.rows(); ++j) + { + actual_cols[i]->insert((*(block.getColumnsWithTypeAndName())[i].column)[j]); + } + } + } + + ColumnsWithTypeAndName actual_columns; + for (size_t i = 0; i < actual_cols.size(); ++i) + actual_columns.push_back({std::move(actual_cols[i]), sample_block.getColumnsWithTypeAndName()[i].type, sample_block.getColumnsWithTypeAndName()[i].name, sample_block.getColumnsWithTypeAndName()[i].column_id}); + return Block(actual_columns); +} + void readStream(Blocks & blocks, BlockInputStreamPtr stream) { stream->readPrefix(); @@ -186,7 +214,7 @@ DB::ColumnsWithTypeAndName readBlocks(std::vector streams) Blocks actual_blocks; for (const auto & stream : streams) readStream(actual_blocks, stream); - return mergeBlocks(std::move(actual_blocks)).getColumnsWithTypeAndName(); + return mergeBlocksForTest(std::move(actual_blocks)).getColumnsWithTypeAndName(); } void ExecutorTest::enablePlanner(bool is_enable) @@ -203,7 +231,7 @@ DB::ColumnsWithTypeAndName ExecutorTest::executeStreams(const std::shared_ptrexecute([&blocks](const Block & block) { blocks.push_back(block); }).verify(); - return mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName(); + return mergeBlocksForTest(std::move(blocks)).getColumnsWithTypeAndName(); } Blocks ExecutorTest::getExecuteStreamsReturnBlocks(const std::shared_ptr & request, size_t concurrency)