Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <[email protected]>
  • Loading branch information
windtalker committed Apr 11, 2023
1 parent 30befd0 commit 5b23886
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 105 deletions.
217 changes: 112 additions & 105 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,153 +821,160 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
throw Exception("Logical error: unknown combination of JOIN", ErrorCodes::LOGICAL_ERROR);
}

Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const
Block Join::doJoinBlockHash(ProbeProcessInfo & probe_process_info) const
{
std::vector<Block> result_blocks;
size_t result_rows = 0;
while (true)
{
probe_process_info.updateStartRow();
/// this makes a copy of `probe_process_info.block`
Block block = probe_process_info.block;
size_t keys_size = key_names_left.size();
probe_process_info.updateStartRow();
/// this makes a copy of `probe_process_info.block`
Block block = probe_process_info.block;
size_t keys_size = key_names_left.size();

/// Rare case, when keys are constant. To avoid code bloat, simply materialize them.
/// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function.
Columns materialized_columns;
ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left);
/// Rare case, when keys are constant. To avoid code bloat, simply materialize them.
/// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function.
Columns materialized_columns;
ColumnRawPtrs key_columns = extractAndMaterializeKeyColumns(block, materialized_columns, key_names_left);

/// Keys with NULL value in any column won't join to anything.
ColumnPtr null_map_holder;
ConstNullMapPtr null_map{};
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
/// reuse null_map to record the filtered rows, the rows contains NULL or does not
/// match the join filter won't join to anything
recordFilteredRows(block, non_equal_conditions.left_filter_column, null_map_holder, null_map);
/// Keys with NULL value in any column won't join to anything.
ColumnPtr null_map_holder;
ConstNullMapPtr null_map{};
extractNestedColumnsAndNullMap(key_columns, null_map_holder, null_map);
/// reuse null_map to record the filtered rows, the rows contains NULL or does not
/// match the join filter won't join to anything
recordFilteredRows(block, non_equal_conditions.left_filter_column, null_map_holder, null_map);

size_t existing_columns = block.columns();
size_t existing_columns = block.columns();

/** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized.
/** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized.
* Because if they are constants, then in the "not joined" rows, they may have different values
* - default values, which can differ from the values of these constants.
*/
if (getFullness(kind))
if (getFullness(kind))
{
for (size_t i = 0; i < existing_columns; ++i)
{
for (size_t i = 0; i < existing_columns; ++i)
{
auto & col = block.getByPosition(i).column;
auto & col = block.getByPosition(i).column;

if (ColumnPtr converted = col->convertToFullColumnIfConst())
col = converted;
if (ColumnPtr converted = col->convertToFullColumnIfConst())
col = converted;

/// convert left columns (except keys) to Nullable
if (std::end(key_names_left) == std::find(key_names_left.begin(), key_names_left.end(), block.getByPosition(i).name))
convertColumnToNullable(block.getByPosition(i));
}
/// convert left columns (except keys) to Nullable
if (std::end(key_names_left) == std::find(key_names_left.begin(), key_names_left.end(), block.getByPosition(i).name))
convertColumnToNullable(block.getByPosition(i));
}
}

/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
* For FULL/RIGHT JOIN, the saved blocks contain keys;
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
*/
size_t num_columns_to_skip = 0;
if (getFullness(kind))
num_columns_to_skip = keys_size;
size_t num_columns_to_skip = 0;
if (getFullness(kind))
num_columns_to_skip = keys_size;

/// Add new columns to the block.
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
/// Add new columns to the block.
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();

std::vector<size_t> right_table_column_indexes;
right_table_column_indexes.reserve(num_columns_to_add);
std::vector<size_t> right_table_column_indexes;
right_table_column_indexes.reserve(num_columns_to_add);

for (size_t i = 0; i < num_columns_to_add; ++i)
{
right_table_column_indexes.push_back(i + existing_columns);
}
for (size_t i = 0; i < num_columns_to_add; ++i)
{
right_table_column_indexes.push_back(i + existing_columns);
}

MutableColumns added_columns;
added_columns.reserve(num_columns_to_add);
MutableColumns added_columns;
added_columns.reserve(num_columns_to_add);

std::vector<size_t> right_indexes;
right_indexes.reserve(num_columns_to_add);
std::vector<size_t> right_indexes;
right_indexes.reserve(num_columns_to_add);

for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name);
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name);

added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}
added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}

size_t rows = block.rows();
size_t rows = block.rows();

/// Used with ANY INNER JOIN
std::unique_ptr<IColumn::Filter> filter;
/// Used with ANY INNER JOIN
std::unique_ptr<IColumn::Filter> filter;

if (((kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right) && strictness == ASTTableJoin::Strictness::Any)
|| kind == ASTTableJoin::Kind::Anti)
filter = std::make_unique<IColumn::Filter>(rows);
if (((kind == ASTTableJoin::Kind::Inner || kind == ASTTableJoin::Kind::Right) && strictness == ASTTableJoin::Strictness::Any)
|| kind == ASTTableJoin::Kind::Anti)
filter = std::make_unique<IColumn::Filter>(rows);

/// Used with ALL ... JOIN
IColumn::Offset current_offset = 0;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
/// Used with ALL ... JOIN
IColumn::Offset current_offset = 0;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;

if (strictness == ASTTableJoin::Strictness::All)
offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);
if (strictness == ASTTableJoin::Strictness::All)
offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows);

bool enable_spill_join = isEnableSpill();
JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round};
JoinPartition::probeBlock(partitions, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators, join_build_info, probe_process_info);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint);
for (size_t i = 0; i < num_columns_to_add; ++i)
bool enable_spill_join = isEnableSpill();
JoinBuildInfo join_build_info{enable_fine_grained_shuffle, fine_grained_shuffle_count, enable_spill_join, is_spilled, build_concurrency, restore_round};
JoinPartition::probeBlock(partitions, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators, join_build_info, probe_process_info);
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint);
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i);
block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), sample_col.type, sample_col.name));
}

size_t process_rows = probe_process_info.end_row - probe_process_info.start_row;

// if rows equal 0, we could ignore filter and offsets_to_replicate, and do not need to update start row.
if (likely(rows != 0))
{
/// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
if (filter && !(kind == ASTTableJoin::Kind::Anti && strictness == ASTTableJoin::Strictness::All))
{
const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i);
block.insert(ColumnWithTypeAndName(std::move(added_columns[i]), sample_col.type, sample_col.name));
// If ANY INNER | RIGHT JOIN, the result will not be spilt, so the block rows must equal process_rows.
RUNTIME_CHECK(rows == process_rows);
for (size_t i = 0; i < existing_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1);
}

size_t process_rows = probe_process_info.end_row - probe_process_info.start_row;

// if rows equal 0, we could ignore filter and offsets_to_replicate, and do not need to update start row.
if (likely(rows != 0))
/// If ALL ... JOIN - we replicate all the columns except the new ones.
if (offsets_to_replicate)
{
/// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones.
if (filter && !(kind == ASTTableJoin::Kind::Anti && strictness == ASTTableJoin::Strictness::All))
for (size_t i = 0; i < existing_columns; ++i)
{
// If ANY INNER | RIGHT JOIN, the result will not be spilt, so the block rows must equal process_rows.
RUNTIME_CHECK(rows == process_rows);
for (size_t i = 0; i < existing_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1);
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicateRange(probe_process_info.start_row, probe_process_info.end_row, *offsets_to_replicate);
}

/// If ALL ... JOIN - we replicate all the columns except the new ones.
if (offsets_to_replicate)
if (rows != process_rows)
{
for (size_t i = 0; i < existing_columns; ++i)
if (isLeftSemiFamily(kind))
{
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicateRange(probe_process_info.start_row, probe_process_info.end_row, *offsets_to_replicate);
}

if (rows != process_rows)
{
if (isLeftSemiFamily(kind))
{
auto helper_col = block.getByName(match_helper_name).column;
helper_col = helper_col->cut(probe_process_info.start_row, probe_process_info.end_row);
}
offsets_to_replicate->assign(offsets_to_replicate->begin() + probe_process_info.start_row, offsets_to_replicate->begin() + probe_process_info.end_row);
auto helper_col = block.getByName(match_helper_name).column;
helper_col = helper_col->cut(probe_process_info.start_row, probe_process_info.end_row);
}
offsets_to_replicate->assign(offsets_to_replicate->begin() + probe_process_info.start_row, offsets_to_replicate->begin() + probe_process_info.end_row);
}
}
}

/// handle other conditions
if (!non_equal_conditions.other_cond_name.empty() || !non_equal_conditions.other_eq_cond_from_in_name.empty())
{
if (!offsets_to_replicate)
throw Exception("Should not reach here, the strictness of join with other condition must be ALL");
handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes);
}
/// handle other conditions
if (!non_equal_conditions.other_cond_name.empty() || !non_equal_conditions.other_eq_cond_from_in_name.empty())
{
if (!offsets_to_replicate)
throw Exception("Should not reach here, the strictness of join with other condition must be ALL");
handleOtherConditions(block, filter, offsets_to_replicate, right_table_column_indexes);
}

return block;
}

Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const
{
std::vector<Block> result_blocks;
size_t result_rows = 0;
while (true)
{
auto block = doJoinBlockHash(probe_process_info);
result_rows += block.rows();
/// exit the while loop if
/// 1. probe_process_info.all_rows_joined_finish is true, which means all the rows in current block is processed
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ class Join
void insertFromBlockInternal(Block * stored_block, size_t stream_index);

Block joinBlockHash(ProbeProcessInfo & probe_process_info) const;
Block doJoinBlockHash(ProbeProcessInfo & probe_process_info) const;

Block joinBlockNullAware(ProbeProcessInfo & probe_process_info) const;

Expand Down

0 comments on commit 5b23886

Please sign in to comment.