Skip to content

Commit

Permalink
Optimize cross join impl (#6707)
Browse files Browse the repository at this point in the history
close #6706
  • Loading branch information
yibin87 authored Feb 2, 2023
1 parent 632fdaa commit 1bad64f
Showing 1 changed file with 31 additions and 33 deletions.
64 changes: 31 additions & 33 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1731,9 +1731,24 @@ struct CrossJoinAdder;
template <ASTTableJoin::Strictness STRICTNESS>
struct CrossJoinAdder<ASTTableJoin::Kind::Cross, STRICTNESS>
{
static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join)
static size_t calTotalRightRows(const BlocksList & blocks)
{
size_t total_rows = 0;
for (const Block & block_right : blocks)
{
size_t rows_right = block_right.rows();
total_rows += rows_right;
}
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
total_rows = std::min(total_rows, 1);
return total_rows;
}
static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t total_right_rows, IColumn::Offsets & offsets_for_replication)
{
size_t expanded_row_size = 0;
for (size_t col_num = 0; col_num < num_existing_columns; ++col_num)
dst_columns[col_num]->insertRangeFrom(*src_left_columns[col_num]->replicateRange(i, i + 1, offsets_for_replication), 0, total_right_rows);

for (const Block & block_right : blocks)
{
size_t rows_right = block_right.rows();
Expand All @@ -1742,16 +1757,10 @@ struct CrossJoinAdder<ASTTableJoin::Kind::Cross, STRICTNESS>
rows_right = std::min(rows_right, 1);
}

for (size_t col_num = 0; col_num < num_existing_columns; ++col_num)
for (size_t j = 0; j < rows_right; ++j)
dst_columns[col_num]->insertFrom(*src_left_columns[col_num], i);

for (size_t col_num = 0; col_num < num_columns_to_add; ++col_num)
{
const IColumn * column_right = block_right.getByPosition(col_num).column.get();

for (size_t j = 0; j < rows_right; ++j)
dst_columns[num_existing_columns + col_num]->insertFrom(*column_right, j);
dst_columns[num_existing_columns + col_num]->insertRangeFrom(*column_right, 0, rows_right);
}
expanded_row_size += rows_right;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
Expand All @@ -1778,9 +1787,9 @@ struct CrossJoinAdder<ASTTableJoin::Kind::Cross, STRICTNESS>
template <ASTTableJoin::Strictness STRICTNESS>
struct CrossJoinAdder<ASTTableJoin::Kind::Cross_Left, STRICTNESS>
{
static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join)
static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t total_right_rows, IColumn::Offsets & offsets_for_replication)
{
CrossJoinAdder<ASTTableJoin::Kind::Cross, STRICTNESS>::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join);
CrossJoinAdder<ASTTableJoin::Kind::Cross, STRICTNESS>::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join, total_right_rows, offsets_for_replication);
}
static void addNotFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join)
{
Expand All @@ -1801,7 +1810,7 @@ struct CrossJoinAdder<ASTTableJoin::Kind::Cross_Left, STRICTNESS>
template <>
struct CrossJoinAdder<ASTTableJoin::Kind::Cross_Anti, ASTTableJoin::Strictness::Any>
{
static void addFound(MutableColumns & /* dst_columns */, size_t /* num_existing_columns */, ColumnRawPtrs & /* src_left_columns */, size_t /* num_columns_to_add */, size_t start_offset, size_t i, const BlocksList & /* blocks */, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join)
static void addFound(MutableColumns & /* dst_columns */, size_t /* num_existing_columns */, ColumnRawPtrs & /* src_left_columns */, size_t /* num_columns_to_add */, size_t start_offset, size_t i, const BlocksList & /* blocks */, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t /* total_right_rows */, IColumn::Offsets & /* offsets_for_replication */)
{
(*is_row_matched)[i - start_offset] = 0;
(*expanded_row_size_after_join)[i - start_offset] = current_offset;
Expand All @@ -1818,9 +1827,9 @@ struct CrossJoinAdder<ASTTableJoin::Kind::Cross_Anti, ASTTableJoin::Strictness::
template <>
struct CrossJoinAdder<ASTTableJoin::Kind::Cross_Anti, ASTTableJoin::Strictness::All>
{
static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join)
static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t total_right_rows, IColumn::Offsets & offsets_for_replication)
{
CrossJoinAdder<ASTTableJoin::Kind::Cross, ASTTableJoin::Strictness::All>::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join);
CrossJoinAdder<ASTTableJoin::Kind::Cross, ASTTableJoin::Strictness::All>::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join, total_right_rows, offsets_for_replication);
}
static void addNotFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join)
{
Expand All @@ -1834,9 +1843,9 @@ struct CrossJoinAdder<ASTTableJoin::Kind::Cross_Anti, ASTTableJoin::Strictness::
template <ASTTableJoin::Strictness STRICTNESS>
struct CrossJoinAdder<ASTTableJoin::Kind::Cross_LeftSemi, STRICTNESS>
{
static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join)
static void addFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, const BlocksList & blocks, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join, size_t total_right_rows, IColumn::Offsets & offsets_for_replication)
{
CrossJoinAdder<ASTTableJoin::Kind::Cross, STRICTNESS>::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add - 1, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join);
CrossJoinAdder<ASTTableJoin::Kind::Cross, STRICTNESS>::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add - 1, start_offset, i, blocks, is_row_matched, current_offset, expanded_row_size_after_join, total_right_rows, offsets_for_replication);
dst_columns[num_existing_columns + num_columns_to_add - 1]->insert(FIELD_INT8_1);
}
static void addNotFound(MutableColumns & dst_columns, size_t num_existing_columns, ColumnRawPtrs & src_left_columns, size_t num_columns_to_add, size_t start_offset, size_t i, IColumn::Filter * is_row_matched, IColumn::Offset & current_offset, IColumn::Offsets * expanded_row_size_after_join)
Expand Down Expand Up @@ -1893,13 +1902,18 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
right_column_index.push_back(num_existing_columns + i);

std::vector<Block> result_blocks;
auto total_right_rows = CrossJoinAdder<ASTTableJoin::Kind::Cross, STRICTNESS>::calTotalRightRows(blocks);
IColumn::Offsets offsets_for_replication(rows_left, total_right_rows); // Used for IColumn::replicateRange, which help replicate each left row by total_right_rows times
for (size_t start = 0; start <= rows_left; start += left_rows_per_iter)
{
size_t end = std::min(start + left_rows_per_iter, rows_left);
MutableColumns dst_columns(block.columns());
for (size_t i = 0; i < block.columns(); ++i)
{
dst_columns[i] = block.getByPosition(i).column->cloneEmpty();
size_t reserved_rows = total_right_rows * (end - start);
if likely (reserved_rows > 0)
dst_columns[i]->reserve(reserved_rows);
}
IColumn::Offset current_offset = 0;
std::unique_ptr<IColumn::Filter> is_row_matched = std::make_unique<IColumn::Filter>(end - start);
Expand All @@ -1917,7 +1931,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
}
if (right_table_rows > 0)
{
CrossJoinAdder<KIND, STRICTNESS>::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start, i, blocks, is_row_matched.get(), current_offset, expanded_row_size_after_join.get());
CrossJoinAdder<KIND, STRICTNESS>::addFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start, i, blocks, is_row_matched.get(), current_offset, expanded_row_size_after_join.get(), total_right_rows, offsets_for_replication);
}
else
{
Expand All @@ -1938,23 +1952,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
}
else
{
auto & sample_block = result_blocks[0];
MutableColumns dst_columns(sample_block.columns());
for (size_t i = 0; i < sample_block.columns(); ++i)
{
dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty();
}
for (auto & current_block : result_blocks)
{
if (current_block.rows() > 0)
{
for (size_t column = 0; column < current_block.columns(); column++)
{
dst_columns[column]->insertRangeFrom(*current_block.getByPosition(column).column, 0, current_block.rows());
}
}
}
block = sample_block.cloneWithColumns(std::move(dst_columns));
block = mergeBlocks(std::move(result_blocks));
}
}

Expand Down

0 comments on commit 1bad64f

Please sign in to comment.