From 71e684a0084731fb8b330b2088765f7f58fb58b3 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 Apr 2023 10:03:23 +0800 Subject: [PATCH 1/9] merge join result block if needed Signed-off-by: xufei --- .../SquashingHashJoinBlockTransform.cpp | 85 ------ .../SquashingHashJoinBlockTransform.h | 44 ---- dbms/src/Flash/tests/gtest_join_executor.cpp | 41 +++ dbms/src/Interpreters/Join.cpp | 243 ++++++++++-------- dbms/src/Interpreters/JoinUtils.cpp | 2 + dbms/src/Interpreters/JoinUtils.h | 2 + 6 files changed, 179 insertions(+), 238 deletions(-) delete mode 100644 dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp delete mode 100644 dbms/src/DataStreams/SquashingHashJoinBlockTransform.h diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp deleted file mode 100644 index 7a43984088c..00000000000 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -namespace DB -{ - -SquashingHashJoinBlockTransform::SquashingHashJoinBlockTransform(UInt64 max_block_size_) - : output_rows(0) - , max_block_size(max_block_size_) - , join_finished(false) -{} - -void SquashingHashJoinBlockTransform::handleOverLimitBlock() -{ - // if over_limit_block is not null, we need to push it into blocks. - if (over_limit_block) - { - assert(!(output_rows && blocks.empty())); - output_rows += over_limit_block->rows(); - blocks.push_back(std::move(over_limit_block.value())); - over_limit_block.reset(); - } -} - -void SquashingHashJoinBlockTransform::appendBlock(Block & block) -{ - if (!block) - { - // if append block is {}, mark join finished. - join_finished = true; - return; - } - size_t current_rows = block.rows(); - - if (!output_rows || output_rows + current_rows <= max_block_size) - { - blocks.push_back(std::move(block)); - output_rows += current_rows; - } - else - { - // if output_rows + current_rows > max block size, put the current result block into over_limit_block and handle it in next read. - assert(!over_limit_block); - over_limit_block.emplace(std::move(block)); - } -} - -Block SquashingHashJoinBlockTransform::getFinalOutputBlock() -{ - Block final_block = vstackBlocks(std::move(blocks)); - reset(); - handleOverLimitBlock(); - return final_block; -} - -void SquashingHashJoinBlockTransform::reset() -{ - blocks.clear(); - output_rows = 0; -} - -bool SquashingHashJoinBlockTransform::isJoinFinished() const -{ - return join_finished; -} - -bool SquashingHashJoinBlockTransform::needAppendBlock() const -{ - return !over_limit_block && !join_finished; -} - -} // namespace DB diff --git a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h b/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h deleted file mode 100644 index 956dac0903f..00000000000 --- a/dbms/src/DataStreams/SquashingHashJoinBlockTransform.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -namespace DB -{ - -class SquashingHashJoinBlockTransform -{ -public: - SquashingHashJoinBlockTransform(UInt64 max_block_size_); - - void appendBlock(Block & block); - Block getFinalOutputBlock(); - bool isJoinFinished() const; - bool needAppendBlock() const; - - -private: - void handleOverLimitBlock(); - void reset(); - - Blocks blocks; - std::optional over_limit_block; - size_t output_rows; - UInt64 max_block_size; - bool join_finished; -}; - -} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/tests/gtest_join_executor.cpp b/dbms/src/Flash/tests/gtest_join_executor.cpp index d332011c51d..44d2c4ec75e 100644 --- a/dbms/src/Flash/tests/gtest_join_executor.cpp +++ b/dbms/src/Flash/tests/gtest_join_executor.cpp @@ -800,6 +800,47 @@ try } CATCH +TEST_F(JoinExecutorTestRunner, MergeAfterSplit) +try +{ + context.addMockTable("split_test", "t1", {{"a", TiDB::TP::TypeLong}, {"b", TiDB::TP::TypeLong}}, {toVec("a", {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}), toVec("b", {2, 2, 2, 2, 2, 2, 2, 2, 2, 2})}); + context.addMockTable("split_test", "t2", {{"a", TiDB::TP::TypeLong}, {"c", TiDB::TP::TypeLong}}, {toVec("a", {1, 1, 1, 1, 1}), toVec("c", {1, 2, 3, 4, 5})}); + + auto request = context + .scan("split_test", "t1") + .join(context.scan("split_test", "t2"), tipb::JoinType::TypeInnerJoin, {col("a")}, {}, {}, {gt(col("b"), col("c"))}, {}) + .build(context); + + std::vector block_sizes{ + 1, + 2, + 7, + 25, + 49, + 50, + 51, + DEFAULT_BLOCK_SIZE}; + std::vector> expect{ + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + {4, 3, 2, 1}, + {5, 5}, + {9, 1}, + {10}, + {10}, + {10}}; + for (size_t i = 0; i < block_sizes.size(); ++i) + { + context.context->setSetting("max_block_size", Field(static_cast(block_sizes[i]))); + auto blocks = getExecuteStreamsReturnBlocks(request); + ASSERT_EQ(expect[i].size(), blocks.size()); + for (size_t j = 0; j < blocks.size(); ++j) + { + ASSERT_EQ(expect[i][j], blocks[j].rows()); + } + } +} +CATCH TEST_F(JoinExecutorTestRunner, SpillToDisk) try diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index fd0c5908943..5b34eef28cc 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -823,147 +823,173 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const { - Block block = probe_process_info.block; - size_t keys_size = key_names_left.size(); - - /// Rare case, when keys are constant. To avoid code bloat, simply materialize them. - /// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function. - Columns materialized_columns; - ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left); - - /// Keys with NULL value in any column won't join to anything. - ColumnPtr null_map_holder; - ConstNullMapPtr null_map{}; - extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map); - /// reuse null_map to record the filtered rows, the rows contains NULL or does not - /// match the join filter won't join to anything - recordFilteredRows(block, non_equal_conditions.left_filter_column, null_map_holder, null_map); - - size_t existing_columns = block.columns(); - - /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. + std::vector result_blocks; + size_t result_rows = 0; + while (true) + { + probe_process_info.updateStartRow(); + /// this makes a copy of `probe_process_info.block` + Block block = probe_process_info.block; + size_t keys_size = key_names_left.size(); + + /// Rare case, when keys are constant. To avoid code bloat, simply materialize them. + /// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function. + Columns materialized_columns; + ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left); + + /// Keys with NULL value in any column won't join to anything. + ColumnPtr null_map_holder; + ConstNullMapPtr null_map{}; + extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map); + /// reuse null_map to record the filtered rows, the rows contains NULL or does not + /// match the join filter won't join to anything + recordFilteredRows(block, non_equal_conditions.left_filter_column, null_map_holder, null_map); + + size_t existing_columns = block.columns(); + + /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. * Because if they are constants, then in the "not joined" rows, they may have different values * - default values, which can differ from the values of these constants. */ - if (getFullness(kind)) - { - for (size_t i = 0; i < existing_columns; ++i) + if (getFullness(kind)) { - auto & col = block.getByPosition(i).column; + for (size_t i = 0; i < existing_columns; ++i) + { + auto & col = block.getByPosition(i).column; - if (ColumnPtr converted = col->convertToFullColumnIfConst()) - col = converted; + if (ColumnPtr converted = col->convertToFullColumnIfConst()) + col = converted; - /// convert left columns (except keys) to Nullable - if (std::end(key_names_left) == std::find(key_names_left.begin(), key_names_left.end(), block.getByPosition(i).name)) - convertColumnToNullable(block.getByPosition(i)); + /// convert left columns (except keys) to Nullable + if (std::end(key_names_left) == std::find(key_names_left.begin(), key_names_left.end(), block.getByPosition(i).name)) + convertColumnToNullable(block.getByPosition(i)); + } } - } - /** For LEFT/INNER JOIN, the saved blocks do not contain keys. + /** For LEFT/INNER JOIN, the saved blocks do not contain keys. * For FULL/RIGHT JOIN, the saved blocks contain keys; * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. */ - size_t num_columns_to_skip = 0; - if (getFullness(kind)) - num_columns_to_skip = keys_size; - - /// Add new columns to the block. - size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); + size_t num_columns_to_skip = 0; + if (getFullness(kind)) + num_columns_to_skip = keys_size; - std::vector right_table_column_indexes; - right_table_column_indexes.reserve(num_columns_to_add); + /// Add new columns to the block. + size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); - for (size_t i = 0; i < num_columns_to_add; ++i) - { - right_table_column_indexes.push_back(i + existing_columns); - } + std::vector right_table_column_indexes; + right_table_column_indexes.reserve(num_columns_to_add); - MutableColumns added_columns; - added_columns.reserve(num_columns_to_add); + for (size_t i = 0; i < num_columns_to_add; ++i) + { + right_table_column_indexes.push_back(i + existing_columns); + } - std::vector right_indexes; - right_indexes.reserve(num_columns_to_add); + MutableColumns added_columns; + added_columns.reserve(num_columns_to_add); - for (size_t i = 0; i < num_columns_to_add; ++i) - { - const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); - RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name); - - added_columns.push_back(src_column.column->cloneEmpty()); - added_columns.back()->reserve(src_column.column->size()); - right_indexes.push_back(num_columns_to_skip + i); - } + std::vector right_indexes; + right_indexes.reserve(num_columns_to_add); - size_t rows = block.rows(); + for (size_t i = 0; i < num_columns_to_add; ++i) + { + const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); + RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name); - /// Used with ANY INNER JOIN - std::unique_ptr filter; + added_columns.push_back(src_column.column->cloneEmpty()); + added_columns.back()->reserve(src_column.column->size()); + right_indexes.push_back(num_columns_to_skip + i); + } - if (((kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right) && strictness == ASTTableJoin::Strictness::Any) - || kind == ASTTableJoin::Kind::Anti) - filter = std::make_unique(rows); + size_t rows = block.rows(); - /// Used with ALL ... JOIN - IColumn::Offset current_offset = 0; - std::unique_ptr offsets_to_replicate; + /// Used with ANY INNER JOIN + std::unique_ptr filter; - if (strictness == ASTTableJoin::Strictness::All) - offsets_to_replicate = std::make_unique(rows); + if (((kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right) && strictness == ASTTableJoin::Strictness::Any) + || kind == ASTTableJoin::Kind::Anti) + filter = std::make_unique(rows); - bool enable_spill_join = isEnableSpill(); - JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round}; - JoinPartition::probeBlock(partitions, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators, join_build_info, probe_process_info); - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); - for (size_t i = 0; i < num_columns_to_add; ++i) - { - const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); - block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), sample_col.type, sample_col.name)); - } + /// Used with ALL ... JOIN + IColumn::Offset current_offset = 0; + std::unique_ptr offsets_to_replicate; - size_t process_rows = probe_process_info.end_row - probe_process_info.start_row; + if (strictness == ASTTableJoin::Strictness::All) + offsets_to_replicate = std::make_unique(rows); - // if rows equal 0, we could ignore filter and offsets_to_replicate, and do not need to update start row. - if (likely(rows != 0)) - { - /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. - if (filter && !(kind == ASTTableJoin::Kind::Anti && strictness == ASTTableJoin::Strictness::All)) + bool enable_spill_join = isEnableSpill(); + JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round}; + JoinPartition::probeBlock(partitions, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators, join_build_info, probe_process_info); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); + for (size_t i = 0; i < num_columns_to_add; ++i) { - // If ANY INNER | RIGHT JOIN, the result will not be spilt, so the block rows must equal process_rows. - RUNTIME_CHECK(rows == process_rows); - for (size_t i = 0; i < existing_columns; ++i) - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1); + const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); + block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), sample_col.type, sample_col.name)); } - /// If ALL ... JOIN - we replicate all the columns except the new ones. - if (offsets_to_replicate) + size_t process_rows = probe_process_info.end_row - probe_process_info.start_row; + + // if rows equal 0, we could ignore filter and offsets_to_replicate, and do not need to update start row. + if (likely(rows != 0)) { - for (size_t i = 0; i < existing_columns; ++i) + /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. + if (filter && !(kind == ASTTableJoin::Kind::Anti && strictness == ASTTableJoin::Strictness::All)) { - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicateRange(probe_process_info.start_row, probe_process_info.end_row, *offsets_to_replicate); + // If ANY INNER | RIGHT JOIN, the result will not be spilt, so the block rows must equal process_rows. + RUNTIME_CHECK(rows == process_rows); + for (size_t i = 0; i < existing_columns; ++i) + block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1); } - if (rows != process_rows) + /// If ALL ... JOIN - we replicate all the columns except the new ones. + if (offsets_to_replicate) { - if (isLeftSemiFamily(kind)) + for (size_t i = 0; i < existing_columns; ++i) + { + block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicateRange(probe_process_info.start_row, probe_process_info.end_row, *offsets_to_replicate); + } + + if (rows != process_rows) { - auto helper_col = block.getByName(match_helper_name).column; - helper_col = helper_col->cut(probe_process_info.start_row, probe_process_info.end_row); + if (isLeftSemiFamily(kind)) + { + auto helper_col = block.getByName(match_helper_name).column; + helper_col = helper_col->cut(probe_process_info.start_row, probe_process_info.end_row); + } + offsets_to_replicate->assign(offsets_to_replicate->begin() + probe_process_info.start_row, offsets_to_replicate->begin() + probe_process_info.end_row); } - offsets_to_replicate->assign(offsets_to_replicate->begin() + probe_process_info.start_row, offsets_to_replicate->begin() + probe_process_info.end_row); } } - } - /// handle other conditions - if (!non_equal_conditions.other_cond_name.empty() || !non_equal_conditions.other_eq_cond_from_in_name.empty()) + /// handle other conditions + if (!non_equal_conditions.other_cond_name.empty() || !non_equal_conditions.other_eq_cond_from_in_name.empty()) + { + if (!offsets_to_replicate) + throw Exception("Should not reach here, the strictness of join with other condition must be ALL"); + handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes); + } + result_rows += block.rows(); + /// exit the while loop if + /// 1. probe_process_info.all_rows_joined_finish is true, which means all the rows in current block is processed + /// 2. result_rows exceeds the min_result_block_size + if (probe_process_info.all_rows_joined_finish || result_rows >= probe_process_info.min_result_block_size) + { + if (result_blocks.empty()) + return block; + result_blocks.push_back(std::move(block)); + break; + } + result_blocks.push_back(std::move(block)); + } + assert(!result_blocks.empty()); + if (result_blocks.size() == 1) { - if (!offsets_to_replicate) - throw Exception("Should not reach here, the strictness of join with other condition must be ALL"); - handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes); + return result_blocks[0]; + } + else + { + return vstackBlocks(std::move(result_blocks)); } - - return block; } namespace @@ -1245,6 +1271,8 @@ Block Join::joinBlockCross(ProbeProcessInfo & probe_process_info) const DISPATCH(false) } #undef DISPATCH + /// todo control the returned block size for cross join + probe_process_info.all_rows_joined_finish = true; return block; } @@ -1307,6 +1335,9 @@ Block Join::joinBlockNullAware(ProbeProcessInfo & probe_process_info) const FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); + /// Null aware join never expand the left block, just handle the whole block at one time is enough + probe_process_info.all_rows_joined_finish = true; + return block; } @@ -1585,6 +1616,7 @@ void Join::finishOneNonJoin(size_t partition_index) Block Join::joinBlock(ProbeProcessInfo & probe_process_info, bool dry_run) const { + assert(!probe_process_info.all_rows_joined_finish); if unlikely (dry_run) { assert(probe_process_info.block.rows() == 0); @@ -1600,8 +1632,6 @@ Block Join::joinBlock(ProbeProcessInfo & probe_process_info, bool dry_run) const } std::shared_lock lock(rwlock); - probe_process_info.updateStartRow(); - Block block{}; using enum ASTTableJoin::Strictness; @@ -1629,11 +1659,6 @@ Block Join::joinBlock(ProbeProcessInfo & probe_process_info, bool dry_run) const block.getByName(match_helper_name).column = ColumnNullable::create(std::move(col_non_matched), std::move(nullable_column->getNullMapColumnPtr())); } - if (isCrossJoin(kind) || isNullAwareSemiFamily(kind)) - { - probe_process_info.all_rows_joined_finish = true; - } - return block; } diff --git a/dbms/src/Interpreters/JoinUtils.cpp b/dbms/src/Interpreters/JoinUtils.cpp index a0b2b73883e..8cbe6f72b1b 100644 --- a/dbms/src/Interpreters/JoinUtils.cpp +++ b/dbms/src/Interpreters/JoinUtils.cpp @@ -26,6 +26,8 @@ void ProbeProcessInfo::resetBlock(Block && block_, size_t partition_index_) all_rows_joined_finish = false; // If the probe block size is greater than max_block_size, we will set max_block_size to the probe block size to avoid some unnecessary split. max_block_size = std::max(max_block_size, block.rows()); + // min_result_block_size is use to avoid generating too many small block, use 50% of the block size as the default value + min_result_block_size = std::max(1, (std::min(block.rows(), max_block_size) + 1) / 2); } void ProbeProcessInfo::updateStartRow() diff --git a/dbms/src/Interpreters/JoinUtils.h b/dbms/src/Interpreters/JoinUtils.h index 6d3f0c154fd..fe4f721d4da 100644 --- a/dbms/src/Interpreters/JoinUtils.h +++ b/dbms/src/Interpreters/JoinUtils.h @@ -63,12 +63,14 @@ struct ProbeProcessInfo Block block; size_t partition_index; UInt64 max_block_size; + UInt64 min_result_block_size; size_t start_row; size_t end_row; bool all_rows_joined_finish; explicit ProbeProcessInfo(UInt64 max_block_size_) : max_block_size(max_block_size_) + , min_result_block_size((max_block_size + 1) / 2) , all_rows_joined_finish(true){}; void resetBlock(Block && block_, size_t partition_index_ = 0); From b83598a1f57a532d8177a06a0c670a5870571451 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 Apr 2023 10:38:48 +0800 Subject: [PATCH 2/9] fix build Signed-off-by: xufei --- .../gtest_squashing_hash_join_transform.cpp | 89 ------------------- 1 file changed, 89 deletions(-) delete mode 100644 dbms/src/DataStreams/tests/gtest_squashing_hash_join_transform.cpp diff --git a/dbms/src/DataStreams/tests/gtest_squashing_hash_join_transform.cpp b/dbms/src/DataStreams/tests/gtest_squashing_hash_join_transform.cpp deleted file mode 100644 index 7e38db3f60e..00000000000 --- a/dbms/src/DataStreams/tests/gtest_squashing_hash_join_transform.cpp +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include - - -namespace DB -{ -namespace tests -{ -class SquashingHashJoinBlockTransformTest : public ::testing::Test -{ -public: - void SetUp() override {} - static ColumnWithTypeAndName toVec(const std::vector & v) - { - return createColumn(v); - } - - static void check(Blocks blocks, UInt64 max_block_size) - { - for (auto & block : blocks) - { - ASSERT(block.rows() <= max_block_size); - } - } -}; - -TEST_F(SquashingHashJoinBlockTransformTest, testALL) -try -{ - std::vector block_size{1, 5, 10, 99, 999, 9999, 39999, DEFAULT_BLOCK_SIZE}; - size_t merge_block_count = 10000; - - for (auto size : block_size) - { - Int64 expect_rows = 0; - Blocks test_blocks; - - for (size_t i = 0; i < merge_block_count; ++i) - { - size_t rand_block_size = std::rand() % size + 1; - expect_rows += rand_block_size; - std::vector values; - for (size_t j = 0; j < rand_block_size; ++j) - { - values.push_back(1); - } - Block block{toVec(values)}; - test_blocks.push_back(block); - } - test_blocks.push_back(Block{}); - - Blocks final_blocks; - size_t index = 0; - Int64 actual_rows = 0; - SquashingHashJoinBlockTransform squashing_transform(size); - while (!squashing_transform.isJoinFinished()) - { - while (squashing_transform.needAppendBlock()) - { - Block result_block = test_blocks[index++]; - squashing_transform.appendBlock(result_block); - } - final_blocks.push_back(squashing_transform.getFinalOutputBlock()); - actual_rows += final_blocks.back().rows(); - } - check(final_blocks, std::min(size, expect_rows)); - ASSERT(actual_rows == expect_rows); - } -} -CATCH - -} // namespace tests -} // namespace DB \ No newline at end of file From 3f55fae9d7f10ad635db4837eba3f2075670af57 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 Apr 2023 10:59:20 +0800 Subject: [PATCH 3/9] address comments Signed-off-by: xufei --- dbms/src/Interpreters/Join.cpp | 217 +++++++++++++++++---------------- dbms/src/Interpreters/Join.h | 1 + 2 files changed, 113 insertions(+), 105 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 5b34eef28cc..25870430663 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -821,153 +821,160 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr throw Exception("Logical error: unknown combination of JOIN", ErrorCodes::LOGICAL_ERROR); } -Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const +Block Join::doJoinBlockHash(ProbeProcessInfo & probe_process_info) const { - std::vector result_blocks; - size_t result_rows = 0; - while (true) - { - probe_process_info.updateStartRow(); - /// this makes a copy of `probe_process_info.block` - Block block = probe_process_info.block; - size_t keys_size = key_names_left.size(); + probe_process_info.updateStartRow(); + /// this makes a copy of `probe_process_info.block` + Block block = probe_process_info.block; + size_t keys_size = key_names_left.size(); - /// Rare case, when keys are constant. To avoid code bloat, simply materialize them. - /// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function. - Columns materialized_columns; - ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left); + /// Rare case, when keys are constant. To avoid code bloat, simply materialize them. + /// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function. + Columns materialized_columns; + ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left); - /// Keys with NULL value in any column won't join to anything. - ColumnPtr null_map_holder; - ConstNullMapPtr null_map{}; - extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map); - /// reuse null_map to record the filtered rows, the rows contains NULL or does not - /// match the join filter won't join to anything - recordFilteredRows(block, non_equal_conditions.left_filter_column, null_map_holder, null_map); + /// Keys with NULL value in any column won't join to anything. + ColumnPtr null_map_holder; + ConstNullMapPtr null_map{}; + extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map); + /// reuse null_map to record the filtered rows, the rows contains NULL or does not + /// match the join filter won't join to anything + recordFilteredRows(block, non_equal_conditions.left_filter_column, null_map_holder, null_map); - size_t existing_columns = block.columns(); + size_t existing_columns = block.columns(); - /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. + /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. * Because if they are constants, then in the "not joined" rows, they may have different values * - default values, which can differ from the values of these constants. */ - if (getFullness(kind)) + if (getFullness(kind)) + { + for (size_t i = 0; i < existing_columns; ++i) { - for (size_t i = 0; i < existing_columns; ++i) - { - auto & col = block.getByPosition(i).column; + auto & col = block.getByPosition(i).column; - if (ColumnPtr converted = col->convertToFullColumnIfConst()) - col = converted; + if (ColumnPtr converted = col->convertToFullColumnIfConst()) + col = converted; - /// convert left columns (except keys) to Nullable - if (std::end(key_names_left) == std::find(key_names_left.begin(), key_names_left.end(), block.getByPosition(i).name)) - convertColumnToNullable(block.getByPosition(i)); - } + /// convert left columns (except keys) to Nullable + if (std::end(key_names_left) == std::find(key_names_left.begin(), key_names_left.end(), block.getByPosition(i).name)) + convertColumnToNullable(block.getByPosition(i)); } + } - /** For LEFT/INNER JOIN, the saved blocks do not contain keys. + /** For LEFT/INNER JOIN, the saved blocks do not contain keys. * For FULL/RIGHT JOIN, the saved blocks contain keys; * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. */ - size_t num_columns_to_skip = 0; - if (getFullness(kind)) - num_columns_to_skip = keys_size; + size_t num_columns_to_skip = 0; + if (getFullness(kind)) + num_columns_to_skip = keys_size; - /// Add new columns to the block. - size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); + /// Add new columns to the block. + size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); - std::vector right_table_column_indexes; - right_table_column_indexes.reserve(num_columns_to_add); + std::vector right_table_column_indexes; + right_table_column_indexes.reserve(num_columns_to_add); - for (size_t i = 0; i < num_columns_to_add; ++i) - { - right_table_column_indexes.push_back(i + existing_columns); - } + for (size_t i = 0; i < num_columns_to_add; ++i) + { + right_table_column_indexes.push_back(i + existing_columns); + } - MutableColumns added_columns; - added_columns.reserve(num_columns_to_add); + MutableColumns added_columns; + added_columns.reserve(num_columns_to_add); - std::vector right_indexes; - right_indexes.reserve(num_columns_to_add); + std::vector right_indexes; + right_indexes.reserve(num_columns_to_add); - for (size_t i = 0; i < num_columns_to_add; ++i) - { - const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); - RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name); + for (size_t i = 0; i < num_columns_to_add; ++i) + { + const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); + RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name); - added_columns.push_back(src_column.column->cloneEmpty()); - added_columns.back()->reserve(src_column.column->size()); - right_indexes.push_back(num_columns_to_skip + i); - } + added_columns.push_back(src_column.column->cloneEmpty()); + added_columns.back()->reserve(src_column.column->size()); + right_indexes.push_back(num_columns_to_skip + i); + } - size_t rows = block.rows(); + size_t rows = block.rows(); - /// Used with ANY INNER JOIN - std::unique_ptr filter; + /// Used with ANY INNER JOIN + std::unique_ptr filter; - if (((kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right) && strictness == ASTTableJoin::Strictness::Any) - || kind == ASTTableJoin::Kind::Anti) - filter = std::make_unique(rows); + if (((kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right) && strictness == ASTTableJoin::Strictness::Any) + || kind == ASTTableJoin::Kind::Anti) + filter = std::make_unique(rows); - /// Used with ALL ... JOIN - IColumn::Offset current_offset = 0; - std::unique_ptr offsets_to_replicate; + /// Used with ALL ... JOIN + IColumn::Offset current_offset = 0; + std::unique_ptr offsets_to_replicate; - if (strictness == ASTTableJoin::Strictness::All) - offsets_to_replicate = std::make_unique(rows); + if (strictness == ASTTableJoin::Strictness::All) + offsets_to_replicate = std::make_unique(rows); - bool enable_spill_join = isEnableSpill(); - JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round}; - JoinPartition::probeBlock(partitions, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators, join_build_info, probe_process_info); - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); - for (size_t i = 0; i < num_columns_to_add; ++i) + bool enable_spill_join = isEnableSpill(); + JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round}; + JoinPartition::probeBlock(partitions, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators, join_build_info, probe_process_info); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); + for (size_t i = 0; i < num_columns_to_add; ++i) + { + const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); + block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), sample_col.type, sample_col.name)); + } + + size_t process_rows = probe_process_info.end_row - probe_process_info.start_row; + + // if rows equal 0, we could ignore filter and offsets_to_replicate, and do not need to update start row. + if (likely(rows != 0)) + { + /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. + if (filter && !(kind == ASTTableJoin::Kind::Anti && strictness == ASTTableJoin::Strictness::All)) { - const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); - block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), sample_col.type, sample_col.name)); + // If ANY INNER | RIGHT JOIN, the result will not be spilt, so the block rows must equal process_rows. + RUNTIME_CHECK(rows == process_rows); + for (size_t i = 0; i < existing_columns; ++i) + block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1); } - size_t process_rows = probe_process_info.end_row - probe_process_info.start_row; - - // if rows equal 0, we could ignore filter and offsets_to_replicate, and do not need to update start row. - if (likely(rows != 0)) + /// If ALL ... JOIN - we replicate all the columns except the new ones. + if (offsets_to_replicate) { - /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. - if (filter && !(kind == ASTTableJoin::Kind::Anti && strictness == ASTTableJoin::Strictness::All)) + for (size_t i = 0; i < existing_columns; ++i) { - // If ANY INNER | RIGHT JOIN, the result will not be spilt, so the block rows must equal process_rows. - RUNTIME_CHECK(rows == process_rows); - for (size_t i = 0; i < existing_columns; ++i) - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1); + block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicateRange(probe_process_info.start_row, probe_process_info.end_row, *offsets_to_replicate); } - /// If ALL ... JOIN - we replicate all the columns except the new ones. - if (offsets_to_replicate) + if (rows != process_rows) { - for (size_t i = 0; i < existing_columns; ++i) + if (isLeftSemiFamily(kind)) { - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicateRange(probe_process_info.start_row, probe_process_info.end_row, *offsets_to_replicate); - } - - if (rows != process_rows) - { - if (isLeftSemiFamily(kind)) - { - auto helper_col = block.getByName(match_helper_name).column; - helper_col = helper_col->cut(probe_process_info.start_row, probe_process_info.end_row); - } - offsets_to_replicate->assign(offsets_to_replicate->begin() + probe_process_info.start_row, offsets_to_replicate->begin() + probe_process_info.end_row); + auto helper_col = block.getByName(match_helper_name).column; + helper_col = helper_col->cut(probe_process_info.start_row, probe_process_info.end_row); } + offsets_to_replicate->assign(offsets_to_replicate->begin() + probe_process_info.start_row, offsets_to_replicate->begin() + probe_process_info.end_row); } } + } - /// handle other conditions - if (!non_equal_conditions.other_cond_name.empty() || !non_equal_conditions.other_eq_cond_from_in_name.empty()) - { - if (!offsets_to_replicate) - throw Exception("Should not reach here, the strictness of join with other condition must be ALL"); - handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes); - } + /// handle other conditions + if (!non_equal_conditions.other_cond_name.empty() || !non_equal_conditions.other_eq_cond_from_in_name.empty()) + { + if (!offsets_to_replicate) + throw Exception("Should not reach here, the strictness of join with other condition must be ALL"); + handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes); + } + + return block; +} + +Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const +{ + std::vector result_blocks; + size_t result_rows = 0; + while (true) + { + auto block = doJoinBlockHash(probe_process_info); result_rows += block.rows(); /// exit the while loop if /// 1. probe_process_info.all_rows_joined_finish is true, which means all the rows in current block is processed diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index e4b167e4168..60b7c3dc730 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -378,6 +378,7 @@ class Join void insertFromBlockInternal(Block * stored_block, size_t stream_index); Block joinBlockHash(ProbeProcessInfo & probe_process_info) const; + Block doJoinBlockHash(ProbeProcessInfo & probe_process_info) const; Block joinBlockNullAware(ProbeProcessInfo & probe_process_info) const; From 2d2518156b0deb9aca1042d45837dd5d89781ce4 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 Apr 2023 15:40:29 +0800 Subject: [PATCH 4/9] Update dbms/src/Interpreters/Join.cpp Co-authored-by: SeaRise --- dbms/src/Interpreters/Join.cpp | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 25870430663..c857ea02cf6 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -989,15 +989,7 @@ Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const result_blocks.push_back(std::move(block)); } assert(!result_blocks.empty()); - if (result_blocks.size() == 1) - { - return result_blocks[0]; - } - else - { - return vstackBlocks(std::move(result_blocks)); - } -} + vstackBlocks(std::move(result_blocks)); namespace { From 61398614a50f55899ba6bf6913dda7620d9c42cb Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 12 Apr 2023 17:36:10 +0800 Subject: [PATCH 5/9] fix build Signed-off-by: xufei --- dbms/src/Interpreters/Join.cpp | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index c857ea02cf6..b27c9d7e272 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -975,21 +975,18 @@ Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const while (true) { auto block = doJoinBlockHash(probe_process_info); + assert(block); result_rows += block.rows(); + result_blocks.push_back(std::move(block)); /// exit the while loop if /// 1. probe_process_info.all_rows_joined_finish is true, which means all the rows in current block is processed /// 2. result_rows exceeds the min_result_block_size if (probe_process_info.all_rows_joined_finish || result_rows >= probe_process_info.min_result_block_size) - { - if (result_blocks.empty()) - return block; - result_blocks.push_back(std::move(block)); break; - } - result_blocks.push_back(std::move(block)); } assert(!result_blocks.empty()); - vstackBlocks(std::move(result_blocks)); + return vstackBlocks(std::move(result_blocks)); +} namespace { From 08199000c6ded38064225ea89ca293dc3c2ef18d Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 12 Apr 2023 17:56:53 +0800 Subject: [PATCH 6/9] more refine Signed-off-by: xufei --- dbms/src/Interpreters/Join.cpp | 7 ++++--- dbms/src/Interpreters/Join.h | 3 ++- dbms/src/Interpreters/JoinUtils.cpp | 16 ++++++++++++++++ dbms/src/Interpreters/JoinUtils.h | 3 +++ 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index b27c9d7e272..e440977a577 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -130,6 +130,8 @@ Join::Join( , match_helper_name(match_helper_name) , kind(kind_) , strictness(strictness_) + , original_strictness(strictness) + , may_block_expanded_after_join_block(mayBlockExpandedAfterJoinBlock(kind, strictness)) , key_names_left(key_names_left_) , key_names_right(key_names_right_) , build_concurrency(0) @@ -138,7 +140,6 @@ Join::Join( , active_probe_threads(0) , collators(collators_) , non_equal_conditions(non_equal_conditions_) - , original_strictness(strictness) , max_block_size(max_block_size_) , max_bytes_before_external_join(max_bytes_before_external_join_) , build_spill_config(build_spill_config_) @@ -980,8 +981,8 @@ Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const result_blocks.push_back(std::move(block)); /// exit the while loop if /// 1. probe_process_info.all_rows_joined_finish is true, which means all the rows in current block is processed - /// 2. result_rows exceeds the min_result_block_size - if (probe_process_info.all_rows_joined_finish || result_rows >= probe_process_info.min_result_block_size) + /// 2. the block may be expanded after join and result_rows exceeds the min_result_block_size + if (probe_process_info.all_rows_joined_finish || (may_block_expanded_after_join_block && result_rows >= probe_process_info.min_result_block_size)) break; } assert(!result_blocks.empty()); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 60b7c3dc730..b4e823ef808 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -265,6 +265,8 @@ class Join ASTTableJoin::Kind kind; ASTTableJoin::Strictness strictness; + ASTTableJoin::Strictness original_strictness; + const bool may_block_expanded_after_join_block; /// Names of key columns (columns for equi-JOIN) in "left" table (in the order they appear in USING clause). const Names key_names_left; @@ -290,7 +292,6 @@ class Join const JoinNonEqualConditions non_equal_conditions; - ASTTableJoin::Strictness original_strictness; size_t max_block_size; /** Blocks of "right" table. */ diff --git a/dbms/src/Interpreters/JoinUtils.cpp b/dbms/src/Interpreters/JoinUtils.cpp index 8cbe6f72b1b..e62673575bf 100644 --- a/dbms/src/Interpreters/JoinUtils.cpp +++ b/dbms/src/Interpreters/JoinUtils.cpp @@ -66,4 +66,20 @@ void computeDispatchHash(size_t rows, data[i] = updateHashValue(join_restore_round, data[i]); } } + +bool mayBlockExpandedAfterJoinBlock(ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness) +{ + /// null aware semi/left semi/anti join never expand the block + if (isNullAwareSemiFamily(kind)) + return false; + if (isLeftSemiFamily(kind)) + return false; + if (isAntiJoin(kind)) + return false; + /// strictness == Any means semi join, it never expand the block + if (strictness == ASTTableJoin::Strictness::Any) + return false; + /// for all the other cases, return true by default + return true; +} } // namespace DB diff --git a/dbms/src/Interpreters/JoinUtils.h b/dbms/src/Interpreters/JoinUtils.h index fe4f721d4da..4b55cfdf7e0 100644 --- a/dbms/src/Interpreters/JoinUtils.h +++ b/dbms/src/Interpreters/JoinUtils.h @@ -58,6 +58,9 @@ inline bool isNullAwareSemiFamily(ASTTableJoin::Kind kind) return kind == ASTTableJoin::Kind::NullAware_Anti || kind == ASTTableJoin::Kind::NullAware_LeftAnti || kind == ASTTableJoin::Kind::NullAware_LeftSemi; } + +bool mayBlockExpandedAfterJoinBlock(ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness); + struct ProbeProcessInfo { Block block; From 43d6f26619a408f7db8cfb5bb14a58d90fcaa89b Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 12 Apr 2023 18:14:51 +0800 Subject: [PATCH 7/9] rename Signed-off-by: xufei --- dbms/src/Interpreters/Join.cpp | 4 ++-- dbms/src/Interpreters/Join.h | 2 +- dbms/src/Interpreters/JoinUtils.cpp | 2 +- dbms/src/Interpreters/JoinUtils.h | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index e440977a577..29a4f4e57ca 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -131,7 +131,7 @@ Join::Join( , kind(kind_) , strictness(strictness_) , original_strictness(strictness) - , may_block_expanded_after_join_block(mayBlockExpandedAfterJoinBlock(kind, strictness)) + , may_probe_side_expanded_after_join(mayProbeSideExpandedAfterJoin(kind, strictness)) , key_names_left(key_names_left_) , key_names_right(key_names_right_) , build_concurrency(0) @@ -982,7 +982,7 @@ Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const /// exit the while loop if /// 1. probe_process_info.all_rows_joined_finish is true, which means all the rows in current block is processed /// 2. the block may be expanded after join and result_rows exceeds the min_result_block_size - if (probe_process_info.all_rows_joined_finish || (may_block_expanded_after_join_block && result_rows >= probe_process_info.min_result_block_size)) + if (probe_process_info.all_rows_joined_finish || (may_probe_side_expanded_after_join && result_rows >= probe_process_info.min_result_block_size)) break; } assert(!result_blocks.empty()); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index b4e823ef808..960a4fc4589 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -266,7 +266,7 @@ class Join ASTTableJoin::Kind kind; ASTTableJoin::Strictness strictness; ASTTableJoin::Strictness original_strictness; - const bool may_block_expanded_after_join_block; + const bool may_probe_side_expanded_after_join; /// Names of key columns (columns for equi-JOIN) in "left" table (in the order they appear in USING clause). const Names key_names_left; diff --git a/dbms/src/Interpreters/JoinUtils.cpp b/dbms/src/Interpreters/JoinUtils.cpp index e62673575bf..4cd13d0c52c 100644 --- a/dbms/src/Interpreters/JoinUtils.cpp +++ b/dbms/src/Interpreters/JoinUtils.cpp @@ -67,7 +67,7 @@ void computeDispatchHash(size_t rows, } } -bool mayBlockExpandedAfterJoinBlock(ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness) +bool mayProbeSideExpandedAfterJoin(ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness) { /// null aware semi/left semi/anti join never expand the block if (isNullAwareSemiFamily(kind)) diff --git a/dbms/src/Interpreters/JoinUtils.h b/dbms/src/Interpreters/JoinUtils.h index 4b55cfdf7e0..8f60f1e4760 100644 --- a/dbms/src/Interpreters/JoinUtils.h +++ b/dbms/src/Interpreters/JoinUtils.h @@ -59,7 +59,7 @@ inline bool isNullAwareSemiFamily(ASTTableJoin::Kind kind) || kind == ASTTableJoin::Kind::NullAware_LeftSemi; } -bool mayBlockExpandedAfterJoinBlock(ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness); +bool mayProbeSideExpandedAfterJoin(ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness); struct ProbeProcessInfo { From a7dcd7f7983bfacae88412d49feef57f6b2c802b Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 12 Apr 2023 18:16:25 +0800 Subject: [PATCH 8/9] save work Signed-off-by: xufei --- dbms/src/Interpreters/JoinUtils.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Interpreters/JoinUtils.cpp b/dbms/src/Interpreters/JoinUtils.cpp index 4cd13d0c52c..d20c1d0bf65 100644 --- a/dbms/src/Interpreters/JoinUtils.cpp +++ b/dbms/src/Interpreters/JoinUtils.cpp @@ -69,14 +69,14 @@ void computeDispatchHash(size_t rows, bool mayProbeSideExpandedAfterJoin(ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness) { - /// null aware semi/left semi/anti join never expand the block + /// null aware semi/left semi/anti join never expand the probe side if (isNullAwareSemiFamily(kind)) return false; if (isLeftSemiFamily(kind)) return false; if (isAntiJoin(kind)) return false; - /// strictness == Any means semi join, it never expand the block + /// strictness == Any means semi join, it never expand the probe side if (strictness == ASTTableJoin::Strictness::Any) return false; /// for all the other cases, return true by default From 2b70eafa6c619aaa68c686ca51dbf56693b7c267 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 13 Apr 2023 10:24:54 +0800 Subject: [PATCH 9/9] add more tests Signed-off-by: xufei --- dbms/src/Flash/tests/gtest_join_executor.cpp | 58 +++++++++++++------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_join_executor.cpp b/dbms/src/Flash/tests/gtest_join_executor.cpp index 44d2c4ec75e..3471741f5bb 100644 --- a/dbms/src/Flash/tests/gtest_join_executor.cpp +++ b/dbms/src/Flash/tests/gtest_join_executor.cpp @@ -806,11 +806,6 @@ try context.addMockTable("split_test", "t1", {{"a", TiDB::TP::TypeLong}, {"b", TiDB::TP::TypeLong}}, {toVec("a", {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}), toVec("b", {2, 2, 2, 2, 2, 2, 2, 2, 2, 2})}); context.addMockTable("split_test", "t2", {{"a", TiDB::TP::TypeLong}, {"c", TiDB::TP::TypeLong}}, {toVec("a", {1, 1, 1, 1, 1}), toVec("c", {1, 2, 3, 4, 5})}); - auto request = context - .scan("split_test", "t1") - .join(context.scan("split_test", "t2"), tipb::JoinType::TypeInnerJoin, {col("a")}, {}, {}, {gt(col("b"), col("c"))}, {}) - .build(context); - std::vector block_sizes{ 1, 2, @@ -820,23 +815,46 @@ try 50, 51, DEFAULT_BLOCK_SIZE}; - std::vector> expect{ - {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, - {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, - {4, 3, 2, 1}, - {5, 5}, - {9, 1}, - {10}, - {10}, - {10}}; - for (size_t i = 0; i < block_sizes.size(); ++i) + auto join_types = {tipb::JoinType::TypeInnerJoin, tipb::JoinType::TypeSemiJoin}; + std::vector>> expects{ + { + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + {4, 3, 2, 1}, + {5, 5}, + {9, 1}, + {10}, + {10}, + {10}, + }, + { + {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + {2, 2, 2, 2, 2}, + {7, 3}, + {10}, + {10}, + {10}, + {10}, + {10}, + }, + }; + for (size_t index = 0; index < join_types.size(); index++) { - context.context->setSetting("max_block_size", Field(static_cast(block_sizes[i]))); - auto blocks = getExecuteStreamsReturnBlocks(request); - ASSERT_EQ(expect[i].size(), blocks.size()); - for (size_t j = 0; j < blocks.size(); ++j) + auto request = context + .scan("split_test", "t1") + .join(context.scan("split_test", "t2"), *(join_types.begin() + index), {col("a")}, {}, {}, {gt(col("b"), col("c"))}, {}) + .build(context); + auto & expect = expects[index]; + + for (size_t i = 0; i < block_sizes.size(); ++i) { - ASSERT_EQ(expect[i][j], blocks[j].rows()); + context.context->setSetting("max_block_size", Field(static_cast(block_sizes[i]))); + auto blocks = getExecuteStreamsReturnBlocks(request); + ASSERT_EQ(expect[i].size(), blocks.size()); + for (size_t j = 0; j < blocks.size(); ++j) + { + ASSERT_EQ(expect[i][j], blocks[j].rows()); + } } } }