Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some code refine in join #6680

Merged
merged 9 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ struct TiFlashJoin
/// @other_filter_column_name: column name of `and(other_cond1, other_cond2, ...)`
/// @other_eq_filter_from_in_column_name: column name of `and(other_eq_cond1_from_in, other_eq_cond2_from_in, ...)`
/// such as
/// `select * from t1 where col1 in (select col2 from t2 where t1.col2 = t2.col3)`
/// - other_filter is `t1.col2 = t2.col3`
/// `select * from t1 where col1 not in (select col2 from t2 where t1.col2 > t2.col3)`
/// - other_filter is `t1.col2 > t2.col3`
/// - other_eq_filter_from_in_column is `t1.col1 = t2.col2`
///
/// new columns from build side prepare join actions cannot be appended.
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/Interpreters/ExpressionActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ void ExpressionAction::prepare(Block & sample_block)
all_const = false;
}

ColumnPtr new_column;

/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
if (all_const && function->isSuitableForConstantFolding())
{
Expand Down Expand Up @@ -452,9 +450,6 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)

if (action.type == ExpressionAction::APPLY_FUNCTION)
{
if (sample_block.has(action.result_name))
throw Exception("Column '" + action.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);

ColumnsWithTypeAndName arguments(action.argument_names.size());
for (size_t i = 0; i < action.argument_names.size(); ++i)
{
Expand Down
51 changes: 24 additions & 27 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
{
segment_index.reserve(rows_per_seg);
}
for (size_t i = 0; i < rows; i++)
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
{
Expand Down Expand Up @@ -692,7 +692,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
else
{
std::lock_guard lk(map.getSegmentMutex(segment_index));
for (size_t i = 0; i < segment_index_info[segment_index].size(); i++)
for (size_t i = 0; i < segment_index_info[segment_index].size(); ++i)
{
Inserter<STRICTNESS, typename Map::SegmentType::HashTable, KeyGetter>::insert(map.getSegmentTable(segment_index), key_getter, stored_block, segment_index_info[segment_index][i], pool, sort_key_containers);
}
Expand Down Expand Up @@ -1106,7 +1106,7 @@ struct Adder<KIND, ASTTableJoin::Strictness::All, Map>

static bool addNotFound(size_t num_columns_to_add, MutableColumns & added_columns, size_t i, IColumn::Filter * filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets, ProbeProcessInfo & probe_process_info)
{
if (KIND == ASTTableJoin::Kind::Inner)
if constexpr (KIND == ASTTableJoin::Kind::Inner)
{
(*offsets)[i] = current_offset;
}
Expand Down Expand Up @@ -1328,7 +1328,7 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
{
const auto * nullable_column = checkAndGetColumn<ColumnNullable>(orig_filter_column.get());
const auto & nested_column_data = static_cast<const ColumnVector<UInt8> *>(nullable_column->getNestedColumnPtr().get())->getData();
for (size_t i = 0; i < nullable_column->size(); i++)
for (size_t i = 0; i < nullable_column->size(); ++i)
{
if (filter_column[i] == 0)
continue;
Expand All @@ -1342,7 +1342,7 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
{
const auto * other_filter_column = checkAndGetColumn<ColumnVector<UInt8>>(orig_filter_column.get());
const auto & other_filter_column_data = static_cast<const ColumnVector<UInt8> *>(other_filter_column)->getData();
for (size_t i = 0; i < other_filter_column->size(); i++)
for (size_t i = 0; i < other_filter_column->size(); ++i)
filter_column[i] = filter_column[i] && other_filter_column_data[i];
}
}
Expand Down Expand Up @@ -1414,7 +1414,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>

/// for (anti)leftSemi join, we should keep only one row for each original row of left table.
/// and because it is semi join, we needn't save columns of right table, so we just keep the first replica.
for (size_t i = 0; i < offsets_to_replicate->size(); i++)
for (size_t i = 0; i < offsets_to_replicate->size(); ++i)
{
size_t prev_offset = i > 0 ? (*offsets_to_replicate)[i - 1] : 0;
size_t current_offset = (*offsets_to_replicate)[i];
Expand Down Expand Up @@ -1448,7 +1448,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
match_nullmap_vec[i] = 1;
}

for (size_t i = 0; i < block.columns(); i++)
for (size_t i = 0; i < block.columns(); ++i)
if (i != helper_pos)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
block.safeGetByPosition(helper_pos).column = ColumnNullable::create(std::move(match_col), std::move(match_nullmap));
Expand All @@ -1467,12 +1467,12 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
if (isInnerJoin(kind) && original_strictness == ASTTableJoin::Strictness::All)
{
/// inner join, just use other_filter_column to filter result
for (size_t i = 0; i < block.columns(); i++)
for (size_t i = 0; i < block.columns(); ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1);
return;
}

for (size_t i = 0, prev_offset = 0; i < offsets_to_replicate->size(); i++)
for (size_t i = 0, prev_offset = 0; i < offsets_to_replicate->size(); ++i)
{
size_t current_offset = (*offsets_to_replicate)[i];
bool has_row_kept = false;
Expand All @@ -1485,7 +1485,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
}
else
{
/// strictness = ALL && kind = Anti should not happens
/// original strictness = ALL && kind = Anti should not happen
row_filter[index] = filter[index];
}
if (row_filter[index])
Expand Down Expand Up @@ -1527,14 +1527,14 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
static_cast<ColumnNullable &>(*result_column).applyNegatedNullMap(*filter_column);
column.column = std::move(result_column);
}
for (size_t i = 0; i < block.columns(); i++)
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
return;
}
if (isInnerJoin(kind) || isAntiJoin(kind))
{
/// for semi/anti join, filter out not matched rows
for (size_t i = 0; i < block.columns(); i++)
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
return;
}
Expand Down Expand Up @@ -1606,7 +1606,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr
added_columns.reserve(num_columns_to_add);

std::vector<size_t> right_table_column_indexes;
for (size_t i = 0; i < num_columns_to_add; i++)
for (size_t i = 0; i < num_columns_to_add; ++i)
{
right_table_column_indexes.push_back(i + existing_columns);
}
Expand All @@ -1616,15 +1616,12 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr

for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i);
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name);

/// Don't insert column if it's in left block.
if (!block.has(src_column.name))
{
added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}
added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}

size_t rows = block.rows();
Expand Down Expand Up @@ -1690,7 +1687,6 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1);
}


/// If ALL ... JOIN - we replicate all the columns except the new ones.
if (offsets_to_replicate)
{
Expand Down Expand Up @@ -1866,6 +1862,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name);
block.insert(src_column);
}

Expand All @@ -1885,22 +1882,22 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
}

std::vector<size_t> right_column_index;
for (size_t i = 0; i < num_columns_to_add; i++)
for (size_t i = 0; i < num_columns_to_add; ++i)
right_column_index.push_back(num_existing_columns + i);

std::vector<Block> result_blocks;
for (size_t start = 0; start <= rows_left; start += left_rows_per_iter)
{
size_t end = std::min(start + left_rows_per_iter, rows_left);
MutableColumns dst_columns(num_existing_columns + num_columns_to_add);
for (size_t i = 0; i < block.columns(); i++)
MutableColumns dst_columns(block.columns());
for (size_t i = 0; i < block.columns(); ++i)
{
dst_columns[i] = block.getByPosition(i).column->cloneEmpty();
}
IColumn::Offset current_offset = 0;
std::unique_ptr<IColumn::Filter> is_row_matched = std::make_unique<IColumn::Filter>(end - start);
std::unique_ptr<IColumn::Offsets> expanded_row_size_after_join = std::make_unique<IColumn::Offsets>(end - start);
for (size_t i = start; i < end; i++)
for (size_t i = start; i < end; ++i)
{
if constexpr (has_null_map)
{
Expand Down Expand Up @@ -1936,7 +1933,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
{
auto & sample_block = result_blocks[0];
MutableColumns dst_columns(sample_block.columns());
for (size_t i = 0; i < sample_block.columns(); i++)
for (size_t i = 0; i < sample_block.columns(); ++i)
{
dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty();
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class Join
std::unique_ptr<ConcurrentHashMapWithSavedHash<StringRef, Mapped>> key_fixed_string;
std::unique_ptr<ConcurrentHashMap<UInt128, Mapped, HashCRC32<UInt128>>> keys128;
std::unique_ptr<ConcurrentHashMap<UInt256, Mapped, HashCRC32<UInt256>>> keys256;
std::unique_ptr<ConcurrentHashMap<StringRef, Mapped>> serialized;
std::unique_ptr<ConcurrentHashMapWithSavedHash<StringRef, Mapped>> serialized;
// TODO: add more cases like Aggregator
};

Expand Down