From 5b23886792dc572b0ada1e016ec71329199a1ba6 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 11 Apr 2023 10:59:20 +0800 Subject: [PATCH] address comments Signed-off-by: xufei --- dbms/src/Interpreters/Join.cpp | 217 +++++++++++++++++---------------- dbms/src/Interpreters/Join.h | 1 + 2 files changed, 113 insertions(+), 105 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 5b34eef28cc..25870430663 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -821,153 +821,160 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr throw Exception("Logical error: unknown combination of JOIN", ErrorCodes::LOGICAL_ERROR); } -Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const +Block Join::doJoinBlockHash(ProbeProcessInfo & probe_process_info) const { - std::vector result_blocks; - size_t result_rows = 0; - while (true) - { - probe_process_info.updateStartRow(); - /// this makes a copy of `probe_process_info.block` - Block block = probe_process_info.block; - size_t keys_size = key_names_left.size(); + probe_process_info.updateStartRow(); + /// this makes a copy of `probe_process_info.block` + Block block = probe_process_info.block; + size_t keys_size = key_names_left.size(); - /// Rare case, when keys are constant. To avoid code bloat, simply materialize them. - /// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function. - Columns materialized_columns; - ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left); + /// Rare case, when keys are constant. To avoid code bloat, simply materialize them. + /// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function. + Columns materialized_columns; + ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left); - /// Keys with NULL value in any column won't join to anything. - ColumnPtr null_map_holder; - ConstNullMapPtr null_map{}; - extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map); - /// reuse null_map to record the filtered rows, the rows contains NULL or does not - /// match the join filter won't join to anything - recordFilteredRows(block, non_equal_conditions.left_filter_column, null_map_holder, null_map); + /// Keys with NULL value in any column won't join to anything. + ColumnPtr null_map_holder; + ConstNullMapPtr null_map{}; + extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map); + /// reuse null_map to record the filtered rows, the rows contains NULL or does not + /// match the join filter won't join to anything + recordFilteredRows(block, non_equal_conditions.left_filter_column, null_map_holder, null_map); - size_t existing_columns = block.columns(); + size_t existing_columns = block.columns(); - /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. + /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. * Because if they are constants, then in the "not joined" rows, they may have different values * - default values, which can differ from the values of these constants. */ - if (getFullness(kind)) + if (getFullness(kind)) + { + for (size_t i = 0; i < existing_columns; ++i) { - for (size_t i = 0; i < existing_columns; ++i) - { - auto & col = block.getByPosition(i).column; + auto & col = block.getByPosition(i).column; - if (ColumnPtr converted = col->convertToFullColumnIfConst()) - col = converted; + if (ColumnPtr converted = col->convertToFullColumnIfConst()) + col = converted; - /// convert left columns (except keys) to Nullable - if (std::end(key_names_left) == std::find(key_names_left.begin(), key_names_left.end(), block.getByPosition(i).name)) - convertColumnToNullable(block.getByPosition(i)); - } + /// convert left columns (except keys) to Nullable + if (std::end(key_names_left) == std::find(key_names_left.begin(), key_names_left.end(), block.getByPosition(i).name)) + convertColumnToNullable(block.getByPosition(i)); } + } - /** For LEFT/INNER JOIN, the saved blocks do not contain keys. + /** For LEFT/INNER JOIN, the saved blocks do not contain keys. * For FULL/RIGHT JOIN, the saved blocks contain keys; * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. */ - size_t num_columns_to_skip = 0; - if (getFullness(kind)) - num_columns_to_skip = keys_size; + size_t num_columns_to_skip = 0; + if (getFullness(kind)) + num_columns_to_skip = keys_size; - /// Add new columns to the block. - size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); + /// Add new columns to the block. + size_t num_columns_to_add = sample_block_with_columns_to_add.columns(); - std::vector right_table_column_indexes; - right_table_column_indexes.reserve(num_columns_to_add); + std::vector right_table_column_indexes; + right_table_column_indexes.reserve(num_columns_to_add); - for (size_t i = 0; i < num_columns_to_add; ++i) - { - right_table_column_indexes.push_back(i + existing_columns); - } + for (size_t i = 0; i < num_columns_to_add; ++i) + { + right_table_column_indexes.push_back(i + existing_columns); + } - MutableColumns added_columns; - added_columns.reserve(num_columns_to_add); + MutableColumns added_columns; + added_columns.reserve(num_columns_to_add); - std::vector right_indexes; - right_indexes.reserve(num_columns_to_add); + std::vector right_indexes; + right_indexes.reserve(num_columns_to_add); - for (size_t i = 0; i < num_columns_to_add; ++i) - { - const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); - RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name); + for (size_t i = 0; i < num_columns_to_add; ++i) + { + const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); + RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name); - added_columns.push_back(src_column.column->cloneEmpty()); - added_columns.back()->reserve(src_column.column->size()); - right_indexes.push_back(num_columns_to_skip + i); - } + added_columns.push_back(src_column.column->cloneEmpty()); + added_columns.back()->reserve(src_column.column->size()); + right_indexes.push_back(num_columns_to_skip + i); + } - size_t rows = block.rows(); + size_t rows = block.rows(); - /// Used with ANY INNER JOIN - std::unique_ptr filter; + /// Used with ANY INNER JOIN + std::unique_ptr filter; - if (((kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right) && strictness == ASTTableJoin::Strictness::Any) - || kind == ASTTableJoin::Kind::Anti) - filter = std::make_unique(rows); + if (((kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right) && strictness == ASTTableJoin::Strictness::Any) + || kind == ASTTableJoin::Kind::Anti) + filter = std::make_unique(rows); - /// Used with ALL ... JOIN - IColumn::Offset current_offset = 0; - std::unique_ptr offsets_to_replicate; + /// Used with ALL ... JOIN + IColumn::Offset current_offset = 0; + std::unique_ptr offsets_to_replicate; - if (strictness == ASTTableJoin::Strictness::All) - offsets_to_replicate = std::make_unique(rows); + if (strictness == ASTTableJoin::Strictness::All) + offsets_to_replicate = std::make_unique(rows); - bool enable_spill_join = isEnableSpill(); - JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round}; - JoinPartition::probeBlock(partitions, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators, join_build_info, probe_process_info); - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); - for (size_t i = 0; i < num_columns_to_add; ++i) + bool enable_spill_join = isEnableSpill(); + JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round}; + JoinPartition::probeBlock(partitions, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators, join_build_info, probe_process_info); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); + for (size_t i = 0; i < num_columns_to_add; ++i) + { + const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); + block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), sample_col.type, sample_col.name)); + } + + size_t process_rows = probe_process_info.end_row - probe_process_info.start_row; + + // if rows equal 0, we could ignore filter and offsets_to_replicate, and do not need to update start row. + if (likely(rows != 0)) + { + /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. + if (filter && !(kind == ASTTableJoin::Kind::Anti && strictness == ASTTableJoin::Strictness::All)) { - const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); - block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), sample_col.type, sample_col.name)); + // If ANY INNER | RIGHT JOIN, the result will not be spilt, so the block rows must equal process_rows. + RUNTIME_CHECK(rows == process_rows); + for (size_t i = 0; i < existing_columns; ++i) + block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1); } - size_t process_rows = probe_process_info.end_row - probe_process_info.start_row; - - // if rows equal 0, we could ignore filter and offsets_to_replicate, and do not need to update start row. - if (likely(rows != 0)) + /// If ALL ... JOIN - we replicate all the columns except the new ones. + if (offsets_to_replicate) { - /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. - if (filter && !(kind == ASTTableJoin::Kind::Anti && strictness == ASTTableJoin::Strictness::All)) + for (size_t i = 0; i < existing_columns; ++i) { - // If ANY INNER | RIGHT JOIN, the result will not be spilt, so the block rows must equal process_rows. - RUNTIME_CHECK(rows == process_rows); - for (size_t i = 0; i < existing_columns; ++i) - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1); + block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicateRange(probe_process_info.start_row, probe_process_info.end_row, *offsets_to_replicate); } - /// If ALL ... JOIN - we replicate all the columns except the new ones. - if (offsets_to_replicate) + if (rows != process_rows) { - for (size_t i = 0; i < existing_columns; ++i) + if (isLeftSemiFamily(kind)) { - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicateRange(probe_process_info.start_row, probe_process_info.end_row, *offsets_to_replicate); - } - - if (rows != process_rows) - { - if (isLeftSemiFamily(kind)) - { - auto helper_col = block.getByName(match_helper_name).column; - helper_col = helper_col->cut(probe_process_info.start_row, probe_process_info.end_row); - } - offsets_to_replicate->assign(offsets_to_replicate->begin() + probe_process_info.start_row, offsets_to_replicate->begin() + probe_process_info.end_row); + auto helper_col = block.getByName(match_helper_name).column; + helper_col = helper_col->cut(probe_process_info.start_row, probe_process_info.end_row); } + offsets_to_replicate->assign(offsets_to_replicate->begin() + probe_process_info.start_row, offsets_to_replicate->begin() + probe_process_info.end_row); } } + } - /// handle other conditions - if (!non_equal_conditions.other_cond_name.empty() || !non_equal_conditions.other_eq_cond_from_in_name.empty()) - { - if (!offsets_to_replicate) - throw Exception("Should not reach here, the strictness of join with other condition must be ALL"); - handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes); - } + /// handle other conditions + if (!non_equal_conditions.other_cond_name.empty() || !non_equal_conditions.other_eq_cond_from_in_name.empty()) + { + if (!offsets_to_replicate) + throw Exception("Should not reach here, the strictness of join with other condition must be ALL"); + handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes); + } + + return block; +} + +Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const +{ + std::vector result_blocks; + size_t result_rows = 0; + while (true) + { + auto block = doJoinBlockHash(probe_process_info); result_rows += block.rows(); /// exit the while loop if /// 1. probe_process_info.all_rows_joined_finish is true, which means all the rows in current block is processed diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index e4b167e4168..60b7c3dc730 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -378,6 +378,7 @@ class Join void insertFromBlockInternal(Block * stored_block, size_t stream_index); Block joinBlockHash(ProbeProcessInfo & probe_process_info) const; + Block doJoinBlockHash(ProbeProcessInfo & probe_process_info) const; Block joinBlockNullAware(ProbeProcessInfo & probe_process_info) const;