Skip to content

Commit

Permalink
Support finalize for join (#8366)
Browse files Browse the repository at this point in the history
close #8296
  • Loading branch information
windtalker authored Dec 28, 2023
1 parent 10f9542 commit 58c5531
Show file tree
Hide file tree
Showing 58 changed files with 1,611 additions and 576 deletions.
151 changes: 56 additions & 95 deletions dbms/src/DataStreams/ScanHashMapAfterProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct AdderMapEntry<ASTTableJoin::Strictness::Any, Mapped>
size_t key_num,
size_t num_columns_left,
MutableColumns & columns_left,
size_t num_columns_right,
const ColumnNumbers & column_indices_right,
MutableColumns & columns_right,
const void *&,
size_t,
Expand All @@ -52,8 +52,10 @@ struct AdderMapEntry<ASTTableJoin::Strictness::Any, Mapped>
/// for detailed explanation
columns_left[j]->insertDefault();

for (size_t j = 0; j < num_columns_right; ++j)
columns_right[j]->insertFrom(*mapped.block->getByPosition(key_num + j).column.get(), mapped.row_num);
for (size_t j = 0; j < column_indices_right.size(); ++j)
columns_right[j]->insertFrom(
*mapped.block->getByPosition(key_num + column_indices_right[j]).column.get(),
mapped.row_num);
return 1;
}
};
Expand All @@ -66,7 +68,7 @@ struct AdderMapEntry<ASTTableJoin::Strictness::All, Mapped>
size_t key_num,
size_t num_columns_left,
MutableColumns & columns_left,
size_t num_columns_right,
const ColumnNumbers & column_indices_right,
MutableColumns & columns_right,
const void *& next_element_in_row_list,
size_t probe_cached_rows_threshold,
Expand All @@ -78,9 +80,9 @@ struct AdderMapEntry<ASTTableJoin::Strictness::All, Mapped>

auto add_one_row = [&]() {
/// handle left columns later to utilize insertManyDefaults
for (size_t j = 0; j < num_columns_right; ++j)
for (size_t j = 0; j < column_indices_right.size(); ++j)
columns_right[j]->insertFrom(
*current->block->getByPosition(key_num + j).column.get(),
*current->block->getByPosition(key_num + column_indices_right[j]).column.get(),
current->row_num);
++rows_added;
};
Expand Down Expand Up @@ -124,7 +126,7 @@ struct AdderRowFlaggedMapEntry
size_t key_num,
size_t num_columns_left,
MutableColumns & columns_left,
size_t num_columns_right,
const ColumnNumbers & column_indices_right,
MutableColumns & columns_right,
const void *& next_element_in_row_list,
size_t probe_cached_rows_threshold,
Expand All @@ -141,9 +143,9 @@ struct AdderRowFlaggedMapEntry
if (flag)
{
/// handle left columns later to utilize insertManyDefaults if any
for (size_t j = 0; j < num_columns_right; ++j)
for (size_t j = 0; j < column_indices_right.size(); ++j)
columns_right[j]->insertFrom(
*current->block->getByPosition(key_num + j).column.get(),
*current->block->getByPosition(key_num + column_indices_right[j]).column.get(),
current->row_num);
++rows_added;
}
Expand Down Expand Up @@ -199,51 +201,50 @@ ScanHashMapAfterProbeBlockInputStream::ScanHashMapAfterProbeBlockInputStream(
* result_sample_block - keys, "left" columns, and "right" columns.
*/

size_t num_columns_left = left_sample_block.columns();
if (isRightSemiFamily(parent.getKind()))
num_columns_left = 0;
else
result_sample_block = materializeBlock(left_sample_block);

size_t num_columns_right = parent.sample_block_with_columns_to_add.columns();
/// Add columns from the right-side table to the block.
for (size_t i = 0; i < num_columns_right; ++i)
column_indices_left.reserve(left_sample_block.columns());
if (!isRightSemiFamily(parent.getKind()))
{
const ColumnWithTypeAndName & src_column = parent.sample_block_with_columns_to_add.getByPosition(i);
result_sample_block.insert(src_column.cloneEmpty());
auto left_full_block = materializeBlock(left_sample_block);
for (size_t i = 0; i < left_full_block.columns(); ++i)
{
auto & column = left_full_block.getByPosition(i);
if (parent.output_column_names_set_after_finalize.contains(column.name))
{
result_sample_block.insert(column.cloneEmpty());
column_indices_left.push_back(i);
}
}
}

column_indices_left.reserve(num_columns_left);
column_indices_right.reserve(num_columns_right);

for (size_t i = 0; i < num_columns_left; ++i)
column_indices_right.reserve(parent.sample_block_without_keys.columns());
/// Add columns from the right-side table to the block.
for (size_t i = 0; i < parent.sample_block_without_keys.columns(); ++i)
{
column_indices_left.push_back(i);
const ColumnWithTypeAndName & src_column = parent.sample_block_without_keys.getByPosition(i);
if (parent.output_column_names_set_after_finalize.contains(src_column.name))
{
result_sample_block.insert(src_column.cloneEmpty());
column_indices_right.push_back(i);
}
}

for (size_t i = 0; i < num_columns_right; ++i)
column_indices_right.push_back(num_columns_left + i);

for (size_t i = 0; i < num_columns_left; ++i)
for (size_t i = 0; i < column_indices_left.size(); ++i)
{
const auto & column_with_type_and_name = result_sample_block.getByPosition(column_indices_left[i]);
const auto & column_with_type_and_name = result_sample_block.getByPosition(i);
if (parent.key_names_left.end()
== std::find(parent.key_names_left.begin(), parent.key_names_left.end(), column_with_type_and_name.name))
/// if it is not the key, then convert to nullable, if it is key, then just keep the original type
/// actually we don't care about the key column now refer to https://github.com/pingcap/tiflash/blob/v6.5.0/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp#L953
/// for detailed explanation
convertColumnToNullable(result_sample_block.getByPosition(column_indices_left[i]));
convertColumnToNullable(result_sample_block.getByPosition(i));
}

columns_left.resize(num_columns_left);
columns_right.resize(num_columns_right);
columns_left.resize(column_indices_left.size());
columns_right.resize(column_indices_right.size());
current_partition_index = index;
projected_sample_block = result_sample_block;

for (const auto & name : parent.tidb_output_column_names)
{
auto & column = result_sample_block.getByName(name);
projected_sample_block.insert(column);
}
projected_sample_block = parent.removeUselessColumn(projected_sample_block);
}

Block ScanHashMapAfterProbeBlockInputStream::readImpl()
Expand Down Expand Up @@ -271,15 +272,15 @@ Block ScanHashMapAfterProbeBlockInputStream::readImpl()

for (size_t i = 0; i < num_columns_left; ++i)
{
const auto & src_col = result_sample_block.safeGetByPosition(column_indices_left[i]);
const auto & src_col = result_sample_block.safeGetByPosition(i);
columns_left[i] = src_col.type->createColumn();
if (row_counter_column == nullptr)
row_counter_column = columns_left[i].get();
}

for (size_t i = 0; i < num_columns_right; ++i)
{
const auto & src_col = result_sample_block.safeGetByPosition(column_indices_right[i]);
const auto & src_col = result_sample_block.safeGetByPosition(num_columns_left + i);
columns_right[i] = src_col.type->createColumn();
if (row_counter_column == nullptr)
row_counter_column = columns_right[i].get();
Expand All @@ -292,44 +293,19 @@ Block ScanHashMapAfterProbeBlockInputStream::readImpl()
{
case ASTTableJoin::Kind::RightSemi:
if (parent.has_other_condition)
fillColumnsUsingCurrentPartition<true, true>(
num_columns_left,
columns_left,
num_columns_right,
columns_right,
row_counter_column);
fillColumnsUsingCurrentPartition<true, true>(columns_left, columns_right, row_counter_column);
else
fillColumnsUsingCurrentPartition<false, true>(
num_columns_left,
columns_left,
num_columns_right,
columns_right,
row_counter_column);
fillColumnsUsingCurrentPartition<false, true>(columns_left, columns_right, row_counter_column);
break;
case ASTTableJoin::Kind::RightAnti:
case ASTTableJoin::Kind::RightOuter:
if (parent.has_other_condition)
fillColumnsUsingCurrentPartition<true, false>(
num_columns_left,
columns_left,
num_columns_right,
columns_right,
row_counter_column);
fillColumnsUsingCurrentPartition<true, false>(columns_left, columns_right, row_counter_column);
else
fillColumnsUsingCurrentPartition<false, false>(
num_columns_left,
columns_left,
num_columns_right,
columns_right,
row_counter_column);
fillColumnsUsingCurrentPartition<false, false>(columns_left, columns_right, row_counter_column);
break;
default:
fillColumnsUsingCurrentPartition<false, false>(
num_columns_left,
columns_left,
num_columns_right,
columns_right,
row_counter_column);
fillColumnsUsingCurrentPartition<false, false>(columns_left, columns_right, row_counter_column);
}
}

Expand All @@ -338,25 +314,16 @@ Block ScanHashMapAfterProbeBlockInputStream::readImpl()

Block res = result_sample_block.cloneEmpty();
for (size_t i = 0; i < num_columns_left; ++i)
res.getByPosition(column_indices_left[i]).column = std::move(columns_left[i]);
res.getByPosition(i).column = std::move(columns_left[i]);
for (size_t i = 0; i < num_columns_right; ++i)
res.getByPosition(column_indices_right[i]).column = std::move(columns_right[i]);
res.getByPosition(num_columns_left + i).column = std::move(columns_right[i]);

/// remove useless columns
Block projected_block;
for (const auto & name : parent.tidb_output_column_names)
{
auto & column = res.getByName(name);
projected_block.insert(std::move(column));
}
return projected_block;
return parent.removeUselessColumn(res);
}

template <bool row_flagged, bool output_joined_rows>
void ScanHashMapAfterProbeBlockInputStream::fillColumnsUsingCurrentPartition(
size_t num_columns_left,
MutableColumns & mutable_columns_left,
size_t num_columns_right,
MutableColumns & mutable_columns_right,
IColumn * row_counter_column)
{
Expand Down Expand Up @@ -384,9 +351,7 @@ void ScanHashMapAfterProbeBlockInputStream::fillColumnsUsingCurrentPartition(
case JoinMapMethod::METHOD: \
fillColumns<ASTTableJoin::Strictness::All, true, output_joined_rows>( \
*partition->maps_all_full_with_row_flag.METHOD, \
num_columns_left, \
mutable_columns_left, \
num_columns_right, \
mutable_columns_right, \
row_counter_column); \
break;
Expand All @@ -405,9 +370,7 @@ void ScanHashMapAfterProbeBlockInputStream::fillColumnsUsingCurrentPartition(
case JoinMapMethod::METHOD: \
fillColumns<ASTTableJoin::Strictness::All, false, output_joined_rows>( \
*partition->maps_all_full.METHOD, \
num_columns_left, \
mutable_columns_left, \
num_columns_right, \
mutable_columns_right, \
row_counter_column); \
break;
Expand Down Expand Up @@ -448,9 +411,7 @@ struct RowCountInfo
template <ASTTableJoin::Strictness STRICTNESS, bool row_flagged, bool output_joined_rows, typename Map>
void ScanHashMapAfterProbeBlockInputStream::fillColumns(
const Map & map,
size_t num_columns_left,
MutableColumns & mutable_columns_left,
size_t num_columns_right,
MutableColumns & mutable_columns_right,
IColumn * row_counter_column)
{
Expand All @@ -461,17 +422,17 @@ void ScanHashMapAfterProbeBlockInputStream::fillColumns(
{
row_count_info.inc(1);
/// handle left columns later to utilize insertManyDefaults
for (size_t j = 0; j < num_columns_right; ++j)
for (size_t j = 0; j < column_indices_right.size(); ++j)
mutable_columns_right[j]->insertFrom(
*not_mapped_row_pos->block->getByPosition(key_num + j).column.get(),
*not_mapped_row_pos->block->getByPosition(key_num + column_indices_right[j]).column.get(),
not_mapped_row_pos->row_num);

not_mapped_row_pos = not_mapped_row_pos->next;
if (row_count_info.reachMaxRows())
break;
}
/// Fill left columns with defaults
for (size_t j = 0; j < num_columns_left; ++j)
for (size_t j = 0; j < column_indices_left.size(); ++j)
/// should fill the key column with key columns from right block
/// but we don't care about the key column now so just insert a default value is ok.
/// refer to https://github.com/pingcap/tiflash/blob/v6.5.0/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp#L953
Expand Down Expand Up @@ -500,9 +461,9 @@ void ScanHashMapAfterProbeBlockInputStream::fillColumns(
row_count_info.inc(AdderRowFlaggedMapEntry<output_joined_rows, typename Map::mapped_type>::add(
(*it)->getMapped(),
key_num,
num_columns_left,
column_indices_left.size(),
mutable_columns_left,
num_columns_right,
column_indices_right,
mutable_columns_right,
next_element_in_row_list,
parent.probe_cache_column_threshold,
Expand All @@ -521,9 +482,9 @@ void ScanHashMapAfterProbeBlockInputStream::fillColumns(
row_count_info.inc(AdderMapEntry<STRICTNESS, typename Map::mapped_type>::add(
(*it)->getMapped(),
key_num,
num_columns_left,
column_indices_left.size(),
mutable_columns_left,
num_columns_right,
column_indices_right,
mutable_columns_right,
next_element_in_row_list,
parent.probe_cache_column_threshold,
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/DataStreams/ScanHashMapAfterProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class ScanHashMapAfterProbeBlockInputStream : public IProfilingBlockInputStream

Block result_sample_block;
Block projected_sample_block; /// same schema with join's final schema
/// Indices of columns in result_sample_block that come from the left-side table (except key columns).
/// Indices of columns in left sample block
ColumnNumbers column_indices_left;
/// Indices of columns that come from the right-side table.
/// Indices of columns in right sample block
/// Order is significant: it is the same as the order of columns in the blocks of the right-side table that are saved in parent.blocks.
ColumnNumbers column_indices_right;
/// Columns of the current output block corresponding to column_indices_left.
Expand All @@ -82,17 +82,13 @@ class ScanHashMapAfterProbeBlockInputStream : public IProfilingBlockInputStream
template <ASTTableJoin::Strictness STRICTNESS, bool row_flagged, bool output_joined_rows, typename Map>
void fillColumns(
const Map & map,
size_t num_columns_left,
MutableColumns & mutable_columns_left,
size_t num_columns_right,
MutableColumns & mutable_columns_right,
IColumn * row_counter_column);

template <bool row_flagged, bool output_joined_rows>
void fillColumnsUsingCurrentPartition(
size_t num_columns_left,
MutableColumns & mutable_columns_left,
size_t num_columns_right,
MutableColumns & mutable_columns_right,
IColumn * row_counter_column);
};
Expand Down
Loading

0 comments on commit 58c5531

Please sign in to comment.