diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 42234a2d541..5e0e72972b5 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -1731,9 +1731,24 @@ struct CrossJoinAdder; template struct CrossJoinAdder { - static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join) + static size_t calTotalRightRows(const BlocksList & blocks) + { + size_t total_rows = 0; + for (const Block & block_right : blocks) + { + size_t rows_right = block_right.rows(); + total_rows += rows_right; + } + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) + total_rows = std::min(total_rows, 1); + return total_rows; + } + static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t total_right_rows, IColumn::Offsets & offsets_for_replication) { size_t expanded_row_size = 0; + for (size_t col_num = 0; col_num < num_existing_columns; ++col_num) + dst_columns[col_num]->insertRangeFrom(*src_left_columns[col_num]->replicateRange(i, i + 1, offsets_for_replication), 0, total_right_rows); + for (const Block & block_right : blocks) { size_t rows_right = block_right.rows(); @@ -1742,16 +1757,10 @@ struct CrossJoinAdder rows_right = std::min(rows_right, 1); } - for (size_t col_num = 0; col_num < num_existing_columns; ++col_num) - for (size_t j = 0; j < rows_right; ++j) - dst_columns[col_num]->insertFrom(*src_left_columns[col_num], i); - for (size_t col_num = 0; col_num < num_columns_to_add; ++col_num) { const IColumn * column_right = block_right.getByPosition(col_num).column.get(); - - for (size_t j = 0; j < rows_right; ++j) - dst_columns[num_existing_columns + col_num]->insertFrom(*column_right, j); + dst_columns[num_existing_columns + col_num]->insertRangeFrom(*column_right, 0, rows_right); } expanded_row_size += rows_right; if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) @@ -1778,9 +1787,9 @@ struct CrossJoinAdder template struct CrossJoinAdder { - static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join) + static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t total_right_rows, IColumn::Offsets & offsets_for_replication) { - CrossJoinAdder::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join); + CrossJoinAdder::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join, total_right_rows, offsets_for_replication); } static void addNotFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join) { @@ -1801,7 +1810,7 @@ struct CrossJoinAdder template <> struct CrossJoinAdder { - static void addFound(MutableColumns & /* dst_columns */, size_t /* num_existing_columns */, ColumnRawPtrs & /* src_left_columns */, size_t /* num_columns_to_add */, size_t start_offset, size_t i, const BlocksList & /* blocks */, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join) + static void addFound(MutableColumns & /* dst_columns */, size_t /* num_existing_columns */, ColumnRawPtrs & /* src_left_columns */, size_t /* num_columns_to_add */, size_t start_offset, size_t i, const BlocksList & /* blocks */, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t /* total_right_rows */, IColumn::Offsets & /* offsets_for_replication */) { (*is_row_matched)[i - start_offset] = 0; (*expanded_row_size_after_join)[i - start_offset] = current_offset; @@ -1818,9 +1827,9 @@ struct CrossJoinAdder struct CrossJoinAdder { - static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join) + static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t total_right_rows, IColumn::Offsets & offsets_for_replication) { - CrossJoinAdder::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join); + CrossJoinAdder::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join, total_right_rows, offsets_for_replication); } static void addNotFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join) { @@ -1834,9 +1843,9 @@ struct CrossJoinAdder struct CrossJoinAdder { - static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join) + static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t total_right_rows, IColumn::Offsets & offsets_for_replication) { - CrossJoinAdder::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add - 1, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join); + CrossJoinAdder::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add - 1, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join, total_right_rows, offsets_for_replication); dst_columns[num_existing_columns + num_columns_to_add - 1]->insert(FIELD_INT8_1); } static void addNotFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join) @@ -1893,6 +1902,8 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[ right_column_index.push_back(num_existing_columns + i); std::vector result_blocks; + auto total_right_rows = CrossJoinAdder::calTotalRightRows(blocks); + IColumn::Offsets offsets_for_replication(rows_left, total_right_rows); // Used for IColumn::replicateRange, which help replicate each left row by total_right_rows times for (size_t start = 0; start <= rows_left; start += left_rows_per_iter) { size_t end = std::min(start + left_rows_per_iter, rows_left); @@ -1900,6 +1911,9 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[ for (size_t i = 0; i < block.columns(); ++i) { dst_columns[i] = block.getByPosition(i).column->cloneEmpty(); + size_t reserved_rows = total_right_rows * (end - start); + if likely (reserved_rows > 0) + dst_columns[i]->reserve(reserved_rows); } IColumn::Offset current_offset = 0; std::unique_ptr is_row_matched = std::make_unique(end - start); @@ -1917,7 +1931,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[ } if (right_table_rows > 0) { - CrossJoinAdder::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start, i, blocks, is_row_matched.get(), current_offset, expanded_row_size_after_join.get()); + CrossJoinAdder::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start, i, blocks, is_row_matched.get(), current_offset, expanded_row_size_after_join.get(), total_right_rows, offsets_for_replication); } else { @@ -1938,23 +1952,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[ } else { - auto & sample_block = result_blocks[0]; - MutableColumns dst_columns(sample_block.columns()); - for (size_t i = 0; i < sample_block.columns(); ++i) - { - dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty(); - } - for (auto & current_block : result_blocks) - { - if (current_block.rows() > 0) - { - for (size_t column = 0; column < current_block.columns(); column++) - { - dst_columns[column]->insertRangeFrom(*current_block.getByPosition(column).column, 0, current_block.rows()); - } - } - } - block = sample_block.cloneWithColumns(std::move(dst_columns)); + block = mergeBlocks(std::move(result_blocks)); } }