Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some code refine in join #6680

Merged
merged 9 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ struct TiFlashJoin
/// @other_filter_column_name: column name of `and(other_cond1, other_cond2, ...)`
/// @other_eq_filter_from_in_column_name: column name of `and(other_eq_cond1_from_in, other_eq_cond2_from_in, ...)`
/// such as
/// `select * from t1 where col1 in (select col2 from t2 where t1.col2 = t2.col3)`
/// - other_filter is `t1.col2 = t2.col3`
/// `select * from t1 where col1 not in (select col2 from t2 where t1.col2 > t2.col3)`
/// - other_filter is `t1.col2 > t2.col3`
/// - other_eq_filter_from_in_column is `t1.col1 = t2.col2`
///
/// new columns from build side prepare join actions cannot be appended.
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/Interpreters/ExpressionActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ void ExpressionAction::prepare(Block & sample_block)
all_const = false;
}

ColumnPtr new_column;

/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
if (all_const && function->isSuitableForConstantFolding())
{
Expand Down Expand Up @@ -452,9 +450,6 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)

if (action.type == ExpressionAction::APPLY_FUNCTION)
{
if (sample_block.has(action.result_name))
throw Exception("Column '" + action.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);

ColumnsWithTypeAndName arguments(action.argument_names.size());
for (size_t i = 0; i < action.argument_names.size(); ++i)
{
Expand Down
21 changes: 9 additions & 12 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,6 @@ void Join::insertFromBlock(const Block & block, size_t stream_index)
total_input_build_rows += block.rows();
blocks.push_back(block);
stored_block = &blocks.back();
original_blocks.push_back(block);
}
insertFromBlockInternal(stored_block, stream_index);
}
Expand Down Expand Up @@ -1106,7 +1105,7 @@ struct Adder<KIND, ASTTableJoin::Strictness::All, Map>

static bool addNotFound(size_t num_columns_to_add, MutableColumns & added_columns, size_t i, IColumn::Filter * filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets, ProbeProcessInfo & probe_process_info)
{
if (KIND == ASTTableJoin::Kind::Inner)
if constexpr (KIND == ASTTableJoin::Kind::Inner)
{
(*offsets)[i] = current_offset;
}
Expand Down Expand Up @@ -1485,7 +1484,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
}
else
{
/// strictness = ALL && kind = Anti should not happens
/// original strictness = ALL && kind = Anti should not happen
row_filter[index] = filter[index];
}
if (row_filter[index])
Expand Down Expand Up @@ -1618,13 +1617,12 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i);

/// Don't insert column if it's in left block.
if (!block.has(src_column.name))
{
added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}
if (block.has(src_column.name))
throw Exception("block from probe side has a column with the same name: " + src_column.name + "as a column in sample_block_with_columns_to_add");
gengliqi marked this conversation as resolved.
Show resolved Hide resolved
gengliqi marked this conversation as resolved.
Show resolved Hide resolved

added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}

size_t rows = block.rows();
Expand Down Expand Up @@ -1690,7 +1688,6 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1);
}


/// If ALL ... JOIN - we replicate all the columns except the new ones.
if (offsets_to_replicate)
{
Expand Down Expand Up @@ -1892,7 +1889,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
for (size_t start = 0; start <= rows_left; start += left_rows_per_iter)
{
size_t end = std::min(start + left_rows_per_iter, rows_left);
MutableColumns dst_columns(num_existing_columns + num_columns_to_add);
MutableColumns dst_columns(block.columns());
for (size_t i = 0; i < block.columns(); i++)
gengliqi marked this conversation as resolved.
Show resolved Hide resolved
{
dst_columns[i] = block.getByPosition(i).column->cloneEmpty();
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class Join
std::unique_ptr<ConcurrentHashMapWithSavedHash<StringRef, Mapped>> key_fixed_string;
std::unique_ptr<ConcurrentHashMap<UInt128, Mapped, HashCRC32<UInt128>>> keys128;
std::unique_ptr<ConcurrentHashMap<UInt256, Mapped, HashCRC32<UInt256>>> keys256;
std::unique_ptr<ConcurrentHashMap<StringRef, Mapped>> serialized;
std::unique_ptr<ConcurrentHashMapWithSavedHash<StringRef, Mapped>> serialized;
// TODO: add more cases like Aggregator
};

Expand Down Expand Up @@ -326,8 +326,6 @@ class Join
BlocksList blocks;
/// mutex to protect concurrent insert to blocks
std::mutex blocks_lock;
/// keep original block for concurrent build
Blocks original_blocks;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will soonly be used when join support spill, I think we can keep it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use BlocksList blocks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the block in blocks is changed during insertFromBlock, the key columns are removed or moved to the start of the block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Maybe we can change this behavior in the future. If so, we can also delete the copy of the right join key columns.


MapsAny maps_any; /// For ANY LEFT|INNER JOIN
MapsAll maps_all; /// For ALL LEFT|INNER JOIN
Expand Down