Skip to content

Commit

Permalink
Revert "Some code refine in join (pingcap#6680)"
Browse files Browse the repository at this point in the history
This reverts commit 0b1ffce.
  • Loading branch information
solotzg committed Jan 31, 2023
1 parent d3837a2 commit 3585d1d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 29 deletions.
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/LimitTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ namespace DB
{
namespace
{
// Removes all rows outside specified range of Block.
void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos)
// Removes all rows outside of specified range of Block.
void cut(Block & block, size_t rows, size_t limit, size_t pos)
{
assert(rows + limit > pos);
size_t pop_back_cnt = pos - limit;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ struct TiFlashJoin
/// @other_filter_column_name: column name of `and(other_cond1, other_cond2, ...)`
/// @other_eq_filter_from_in_column_name: column name of `and(other_eq_cond1_from_in, other_eq_cond2_from_in, ...)`
/// such as
/// `select * from t1 where col1 not in (select col2 from t2 where t1.col2 > t2.col3)`
/// - other_filter is `t1.col2 > t2.col3`
/// `select * from t1 where col1 in (select col2 from t2 where t1.col2 = t2.col3)`
/// - other_filter is `t1.col2 = t2.col3`
/// - other_eq_filter_from_in_column is `t1.col1 = t2.col2`
///
/// new columns from build side prepare join actions cannot be appended.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/ExpressionActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ void ExpressionAction::prepare(Block & sample_block)
all_const = false;
}

ColumnPtr new_column;

/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
if (all_const && function->isSuitableForConstantFolding())
{
Expand Down Expand Up @@ -450,6 +452,9 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)

if (action.type == ExpressionAction::APPLY_FUNCTION)
{
if (sample_block.has(action.result_name))
throw Exception("Column '" + action.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);

ColumnsWithTypeAndName arguments(action.argument_names.size());
for (size_t i = 0; i < action.argument_names.size(); ++i)
{
Expand Down
51 changes: 27 additions & 24 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
{
segment_index.reserve(rows_per_seg);
}
for (size_t i = 0; i < rows; ++i)
for (size_t i = 0; i < rows; i++)
{
if (has_null_map && (*null_map)[i])
{
Expand Down Expand Up @@ -692,7 +692,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
else
{
std::lock_guard lk(map.getSegmentMutex(segment_index));
for (size_t i = 0; i < segment_index_info[segment_index].size(); ++i)
for (size_t i = 0; i < segment_index_info[segment_index].size(); i++)
{
Inserter<STRICTNESS, typename Map::SegmentType::HashTable, KeyGetter>::insert(map.getSegmentTable(segment_index), key_getter, stored_block, segment_index_info[segment_index][i], pool, sort_key_containers);
}
Expand Down Expand Up @@ -1106,7 +1106,7 @@ struct Adder<KIND, ASTTableJoin::Strictness::All, Map>

static bool addNotFound(size_t num_columns_to_add, MutableColumns & added_columns, size_t i, IColumn::Filter * filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets, ProbeProcessInfo & probe_process_info)
{
if constexpr (KIND == ASTTableJoin::Kind::Inner)
if (KIND == ASTTableJoin::Kind::Inner)
{
(*offsets)[i] = current_offset;
}
Expand Down Expand Up @@ -1328,7 +1328,7 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
{
const auto * nullable_column = checkAndGetColumn<ColumnNullable>(orig_filter_column.get());
const auto & nested_column_data = static_cast<const ColumnVector<UInt8> *>(nullable_column->getNestedColumnPtr().get())->getData();
for (size_t i = 0; i < nullable_column->size(); ++i)
for (size_t i = 0; i < nullable_column->size(); i++)
{
if (filter_column[i] == 0)
continue;
Expand All @@ -1342,7 +1342,7 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
{
const auto * other_filter_column = checkAndGetColumn<ColumnVector<UInt8>>(orig_filter_column.get());
const auto & other_filter_column_data = static_cast<const ColumnVector<UInt8> *>(other_filter_column)->getData();
for (size_t i = 0; i < other_filter_column->size(); ++i)
for (size_t i = 0; i < other_filter_column->size(); i++)
filter_column[i] = filter_column[i] && other_filter_column_data[i];
}
}
Expand Down Expand Up @@ -1414,7 +1414,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>

/// for (anti)leftSemi join, we should keep only one row for each original row of left table.
/// and because it is semi join, we needn't save columns of right table, so we just keep the first replica.
for (size_t i = 0; i < offsets_to_replicate->size(); ++i)
for (size_t i = 0; i < offsets_to_replicate->size(); i++)
{
size_t prev_offset = i > 0 ? (*offsets_to_replicate)[i - 1] : 0;
size_t current_offset = (*offsets_to_replicate)[i];
Expand Down Expand Up @@ -1448,7 +1448,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
match_nullmap_vec[i] = 1;
}

for (size_t i = 0; i < block.columns(); ++i)
for (size_t i = 0; i < block.columns(); i++)
if (i != helper_pos)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
block.safeGetByPosition(helper_pos).column = ColumnNullable::create(std::move(match_col), std::move(match_nullmap));
Expand All @@ -1467,12 +1467,12 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
if (isInnerJoin(kind) && original_strictness == ASTTableJoin::Strictness::All)
{
/// inner join, just use other_filter_column to filter result
for (size_t i = 0; i < block.columns(); ++i)
for (size_t i = 0; i < block.columns(); i++)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1);
return;
}

for (size_t i = 0, prev_offset = 0; i < offsets_to_replicate->size(); ++i)
for (size_t i = 0, prev_offset = 0; i < offsets_to_replicate->size(); i++)
{
size_t current_offset = (*offsets_to_replicate)[i];
bool has_row_kept = false;
Expand All @@ -1485,7 +1485,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
}
else
{
/// original strictness = ALL && kind = Anti should not happen
/// strictness = ALL && kind = Anti should not happens
row_filter[index] = filter[index];
}
if (row_filter[index])
Expand Down Expand Up @@ -1527,14 +1527,14 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
static_cast<ColumnNullable &>(*result_column).applyNegatedNullMap(*filter_column);
column.column = std::move(result_column);
}
for (size_t i = 0; i < block.columns(); ++i)
for (size_t i = 0; i < block.columns(); i++)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
return;
}
if (isInnerJoin(kind) || isAntiJoin(kind))
{
/// for semi/anti join, filter out not matched rows
for (size_t i = 0; i < block.columns(); ++i)
for (size_t i = 0; i < block.columns(); i++)
block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1);
return;
}
Expand Down Expand Up @@ -1606,7 +1606,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr
added_columns.reserve(num_columns_to_add);

std::vector<size_t> right_table_column_indexes;
for (size_t i = 0; i < num_columns_to_add; ++i)
for (size_t i = 0; i < num_columns_to_add; i++)
{
right_table_column_indexes.push_back(i + existing_columns);
}
Expand All @@ -1616,12 +1616,15 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr

for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name);
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i);

added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
/// Don't insert column if it's in left block.
if (!block.has(src_column.name))
{
added_columns.push_back(src_column.column->cloneEmpty());
added_columns.back()->reserve(src_column.column->size());
right_indexes.push_back(num_columns_to_skip + i);
}
}

size_t rows = block.rows();
Expand Down Expand Up @@ -1687,6 +1690,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1);
}


/// If ALL ... JOIN - we replicate all the columns except the new ones.
if (offsets_to_replicate)
{
Expand Down Expand Up @@ -1862,7 +1866,6 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name);
block.insert(src_column);
}

Expand All @@ -1882,22 +1885,22 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
}

std::vector<size_t> right_column_index;
for (size_t i = 0; i < num_columns_to_add; ++i)
for (size_t i = 0; i < num_columns_to_add; i++)
right_column_index.push_back(num_existing_columns + i);

std::vector<Block> result_blocks;
for (size_t start = 0; start <= rows_left; start += left_rows_per_iter)
{
size_t end = std::min(start + left_rows_per_iter, rows_left);
MutableColumns dst_columns(block.columns());
for (size_t i = 0; i < block.columns(); ++i)
MutableColumns dst_columns(num_existing_columns + num_columns_to_add);
for (size_t i = 0; i < block.columns(); i++)
{
dst_columns[i] = block.getByPosition(i).column->cloneEmpty();
}
IColumn::Offset current_offset = 0;
std::unique_ptr<IColumn::Filter> is_row_matched = std::make_unique<IColumn::Filter>(end - start);
std::unique_ptr<IColumn::Offsets> expanded_row_size_after_join = std::make_unique<IColumn::Offsets>(end - start);
for (size_t i = start; i < end; ++i)
for (size_t i = start; i < end; i++)
{
if constexpr (has_null_map)
{
Expand Down Expand Up @@ -1933,7 +1936,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
{
auto & sample_block = result_blocks[0];
MutableColumns dst_columns(sample_block.columns());
for (size_t i = 0; i < sample_block.columns(); ++i)
for (size_t i = 0; i < sample_block.columns(); i++)
{
dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty();
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class Join
std::unique_ptr<ConcurrentHashMapWithSavedHash<StringRef, Mapped>> key_fixed_string;
std::unique_ptr<ConcurrentHashMap<UInt128, Mapped, HashCRC32<UInt128>>> keys128;
std::unique_ptr<ConcurrentHashMap<UInt256, Mapped, HashCRC32<UInt256>>> keys256;
std::unique_ptr<ConcurrentHashMapWithSavedHash<StringRef, Mapped>> serialized;
std::unique_ptr<ConcurrentHashMap<StringRef, Mapped>> serialized;
// TODO: add more cases like Aggregator
};

Expand Down

0 comments on commit 3585d1d

Please sign in to comment.