Skip to content

Commit

Permalink
Some code refine in join (#6680)
Browse files Browse the repository at this point in the history
ref #5900
gengliqi authored Jan 30, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent e64d8cc commit 0b1ffce
Showing 5 changed files with 29 additions and 37 deletions.
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/LimitTransformAction.cpp
Original file line number Diff line number Diff line change
@@ -19,8 +19,8 @@ namespace DB
{
namespace
{
// Removes all rows outside of specified range of Block.
void cut(Block & block, size_t rows, size_t limit, size_t pos)
// Removes all rows outside specified range of Block.
void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos)
{
assert(rows + limit > pos);
size_t pop_back_cnt = pos - limit;
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
@@ -97,8 +97,8 @@ struct TiFlashJoin
/// @other_filter_column_name: column name of `and(other_cond1, other_cond2, ...)`
/// @other_eq_filter_from_in_column_name: column name of `and(other_eq_cond1_from_in, other_eq_cond2_from_in, ...)`
/// such as
/// `select * from t1 where col1 in (select col2 from t2 where t1.col2 = t2.col3)`
/// - other_filter is `t1.col2 = t2.col3`
/// `select * from t1 where col1 not in (select col2 from t2 where t1.col2 > t2.col3)`
/// - other_filter is `t1.col2 > t2.col3`
/// - other_eq_filter_from_in_column is `t1.col1 = t2.col2`
///
/// new columns from build side prepare join actions cannot be appended.
5 changes: 0 additions & 5 deletions dbms/src/Interpreters/ExpressionActions.cpp
Original file line number Diff line number Diff line change
@@ -159,8 +159,6 @@ void ExpressionAction::prepare(Block & sample_block)
all_const = false;
}

ColumnPtr new_column;

/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
if (all_const && function->isSuitableForConstantFolding())
{
@@ -452,9 +450,6 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)

if (action.type == ExpressionAction::APPLY_FUNCTION)
{
if (sample_block.has(action.result_name))
throw Exception("Column '" + action.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);

ColumnsWithTypeAndName arguments(action.argument_names.size());
for (size_t i = 0; i < action.argument_names.size(); ++i)
{
51 changes: 24 additions & 27 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
@@ -654,7 +654,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
{
segment_index.reserve(rows_per_seg);
}
for (size_t i = 0; i < rows; i++)
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
{
@@ -692,7 +692,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
else
{
std::lock_guard lk(map.getSegmentMutex(segment_index));
for (size_t i = 0; i < segment_index_info[segment_index].size(); i++)
for (size_t i = 0; i < segment_index_info[segment_index].size(); ++i)
{
Inserter<STRICTNESS, typename Map::SegmentType::HashTable, KeyGetter>::insert(map.getSegmentTable(segment_index), key_getter, stored_block, segment_index_info[segment_index][i], pool, sort_key_containers);
}
@@ -1106,7 +1106,7 @@ struct Adder<KIND, ASTTableJoin::Strictness::All, Map>

static bool addNotFound(size_t num_columns_to_add, MutableColumns & added_columns, size_t i, IColumn::Filter * filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets, ProbeProcessInfo & probe_process_info)
{
if (KIND == ASTTableJoin::Kind::Inner)
if constexpr (KIND == ASTTableJoin::Kind::Inner)
{
(*offsets)[i] = current_offset;
}
@@ -1328,7 +1328,7 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
{
const auto * nullable_column = checkAndGetColumn<ColumnNullable>(orig_filter_column.get());
const auto & nested_column_data = static_cast<const ColumnVector<UInt8> *>(nullable_column->getNestedColumnPtr().get())->getData();
for (size_t i = 0; i < nullable_column->size(); i++)
for (size_t i = 0; i < nullable_column->size(); ++i)
{
if (filter_column[i] == 0)
continue;
@@ -1342,7 +1342,7 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
{
const auto * other_filter_column = checkAndGetColumn<ColumnVector<UInt8>>(orig_filter_column.get());
const auto & other_filter_column_data = static_cast<const ColumnVector<UInt8> *>(other_filter_column)->getData();
for (size_t i = 0; i < other_filter_column->size(); i++)
for (size_t i = 0; i < other_filter_column->size(); ++i)
filter_column[i] = filter_column[i] && other_filter_column_data[i];
}
}
@@ -1414,7 +1414,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>

/// for (anti)leftSemi join, we should keep only one row for each original row of left table.
/// and because it is semi join, we needn't save columns of right table, so we just keep the first replica.
for (size_t i = 0; i < offsets_to_replicate->size(); i++)
for (size_t i = 0; i < offsets_to_replicate->size(); ++i)
{
size_t prev_offset = i > 0 ? (*offsets_to_replicate)[i - 1] : 0;
size_t current_offset = (*offsets_to_replicate)[i];
@@ -1448,7 +1448,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
match_nullmap_vec[i] = 1;
}

for (size_t i = 0; i < block.columns(); i++)
for (size_t i = 0; i < block.columns(); ++i)
if (i != helper_pos)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
block.safeGetByPosition(helper_pos).column = ColumnNullable::create(std::move(match_col), std::move(match_nullmap));
@@ -1467,12 +1467,12 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
if (isInnerJoin(kind) && original_strictness == ASTTableJoin::Strictness::All)
{
/// inner join, just use other_filter_column to filter result
for (size_t i = 0; i < block.columns(); i++)
for (size_t i = 0; i < block.columns(); ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1);
return;
}

for (size_t i = 0, prev_offset = 0; i < offsets_to_replicate->size(); i++)
for (size_t i = 0, prev_offset = 0; i < offsets_to_replicate->size(); ++i)
{
size_t current_offset = (*offsets_to_replicate)[i];
bool has_row_kept = false;
@@ -1485,7 +1485,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
}
else
{
/// strictness = ALL && kind = Anti should not happens
/// original strictness = ALL && kind = Anti should not happen
row_filter[index] = filter[index];
}
if (row_filter[index])
@@ -1527,14 +1527,14 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
static_cast<ColumnNullable &>(*result_column).applyNegatedNullMap(*filter_column);
column.column = std::move(result_column);
}
for (size_t i = 0; i < block.columns(); i++)
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
return;
}
if (isInnerJoin(kind) || isAntiJoin(kind))
{
/// for semi/anti join, filter out not matched rows
for (size_t i = 0; i < block.columns(); i++)
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
return;
}
@@ -1606,7 +1606,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr
added_columns.reserve(num_columns_to_add);

std::vector<size_t> right_table_column_indexes;
for (size_t i = 0; i < num_columns_to_add; i++)
for (size_t i = 0; i < num_columns_to_add; ++i)
{
right_table_column_indexes.push_back(i + existing_columns);
}
@@ -1616,15 +1616,12 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr

for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i);
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name);

/// Don't insert column if it's in left block.
if (!block.has(src_column.name))
{
added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}
added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}

size_t rows = block.rows();
@@ -1690,7 +1687,6 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1);
}


/// If ALL ... JOIN - we replicate all the columns except the new ones.
if (offsets_to_replicate)
{
@@ -1866,6 +1862,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name);
block.insert(src_column);
}

@@ -1885,22 +1882,22 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
}

std::vector<size_t> right_column_index;
for (size_t i = 0; i < num_columns_to_add; i++)
for (size_t i = 0; i < num_columns_to_add; ++i)
right_column_index.push_back(num_existing_columns + i);

std::vector<Block> result_blocks;
for (size_t start = 0; start <= rows_left; start += left_rows_per_iter)
{
size_t end = std::min(start + left_rows_per_iter, rows_left);
MutableColumns dst_columns(num_existing_columns + num_columns_to_add);
for (size_t i = 0; i < block.columns(); i++)
MutableColumns dst_columns(block.columns());
for (size_t i = 0; i < block.columns(); ++i)
{
dst_columns[i] = block.getByPosition(i).column->cloneEmpty();
}
IColumn::Offset current_offset = 0;
std::unique_ptr<IColumn::Filter> is_row_matched = std::make_unique<IColumn::Filter>(end - start);
std::unique_ptr<IColumn::Offsets> expanded_row_size_after_join = std::make_unique<IColumn::Offsets>(end - start);
for (size_t i = start; i < end; i++)
for (size_t i = start; i < end; ++i)
{
if constexpr (has_null_map)
{
@@ -1936,7 +1933,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
{
auto & sample_block = result_blocks[0];
MutableColumns dst_columns(sample_block.columns());
for (size_t i = 0; i < sample_block.columns(); i++)
for (size_t i = 0; i < sample_block.columns(); ++i)
{
dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty();
}
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
@@ -276,7 +276,7 @@ class Join
std::unique_ptr<ConcurrentHashMapWithSavedHash<StringRef, Mapped>> key_fixed_string;
std::unique_ptr<ConcurrentHashMap<UInt128, Mapped, HashCRC32<UInt128>>> keys128;
std::unique_ptr<ConcurrentHashMap<UInt256, Mapped, HashCRC32<UInt256>>> keys256;
std::unique_ptr<ConcurrentHashMap<StringRef, Mapped>> serialized;
std::unique_ptr<ConcurrentHashMapWithSavedHash<StringRef, Mapped>> serialized;
// TODO: add more cases like Aggregator
};

0 comments on commit 0b1ffce

Please sign in to comment.