From 3585d1d2a0179d3b42b69a9aa6a31bc248e3fe38 Mon Sep 17 00:00:00 2001 From: Zhigao Tong Date: Wed, 1 Feb 2023 00:05:40 +0800 Subject: [PATCH] Revert "Some code refine in join (#6680)" This reverts commit 0b1ffce376174d7aca996fa2c3f2b1cfaa12b8f6. --- dbms/src/DataStreams/LimitTransformAction.cpp | 4 +- .../Flash/Coprocessor/JoinInterpreterHelper.h | 4 +- dbms/src/Interpreters/ExpressionActions.cpp | 5 ++ dbms/src/Interpreters/Join.cpp | 51 ++++++++++--------- dbms/src/Interpreters/Join.h | 2 +- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/dbms/src/DataStreams/LimitTransformAction.cpp b/dbms/src/DataStreams/LimitTransformAction.cpp index 1fe4d06e520..86343477add 100644 --- a/dbms/src/DataStreams/LimitTransformAction.cpp +++ b/dbms/src/DataStreams/LimitTransformAction.cpp @@ -19,8 +19,8 @@ namespace DB { namespace { -// Removes all rows outside specified range of Block. -void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos) +// Removes all rows outside of specified range of Block. +void cut(Block & block, size_t rows, size_t limit, size_t pos) { assert(rows + limit > pos); size_t pop_back_cnt = pos - limit; diff --git a/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h b/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h index 61a677ce701..b9b107a82b0 100644 --- a/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h +++ b/dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h @@ -97,8 +97,8 @@ struct TiFlashJoin /// @other_filter_column_name: column name of `and(other_cond1, other_cond2, ...)` /// @other_eq_filter_from_in_column_name: column name of `and(other_eq_cond1_from_in, other_eq_cond2_from_in, ...)` /// such as - /// `select * from t1 where col1 not in (select col2 from t2 where t1.col2 > t2.col3)` - /// - other_filter is `t1.col2 > t2.col3` + /// `select * from t1 where col1 in (select col2 from t2 where t1.col2 = t2.col3)` + /// - other_filter is `t1.col2 = t2.col3` /// - other_eq_filter_from_in_column is `t1.col1 = t2.col2` /// /// new columns from build side prepare join actions cannot be appended. diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 46547a4f686..4f14f328b35 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -159,6 +159,8 @@ void ExpressionAction::prepare(Block & sample_block) all_const = false; } + ColumnPtr new_column; + /// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function. if (all_const && function->isSuitableForConstantFolding()) { @@ -450,6 +452,9 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names) if (action.type == ExpressionAction::APPLY_FUNCTION) { + if (sample_block.has(action.result_name)) + throw Exception("Column '" + action.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN); + ColumnsWithTypeAndName arguments(action.argument_names.size()); for (size_t i = 0; i < action.argument_names.size(); ++i) { diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index de0a3a6713e..f03e1b293f7 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -654,7 +654,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( { segment_index.reserve(rows_per_seg); } - for (size_t i = 0; i < rows; ++i) + for (size_t i = 0; i < rows; i++) { if (has_null_map && (*null_map)[i]) { @@ -692,7 +692,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( else { std::lock_guard lk(map.getSegmentMutex(segment_index)); - for (size_t i = 0; i < segment_index_info[segment_index].size(); ++i) + for (size_t i = 0; i < segment_index_info[segment_index].size(); i++) { Inserter::insert(map.getSegmentTable(segment_index), key_getter, stored_block, segment_index_info[segment_index][i], pool, sort_key_containers); } @@ -1106,7 +1106,7 @@ struct Adder static bool addNotFound(size_t num_columns_to_add, MutableColumns & added_columns, size_t i, IColumn::Filter * filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets, ProbeProcessInfo & probe_process_info) { - if constexpr (KIND == ASTTableJoin::Kind::Inner) + if (KIND == ASTTableJoin::Kind::Inner) { (*offsets)[i] = current_offset; } @@ -1328,7 +1328,7 @@ void mergeNullAndFilterResult(Block & block, ColumnVector::Container & fi { const auto * nullable_column = checkAndGetColumn(orig_filter_column.get()); const auto & nested_column_data = static_cast *>(nullable_column->getNestedColumnPtr().get())->getData(); - for (size_t i = 0; i < nullable_column->size(); ++i) + for (size_t i = 0; i < nullable_column->size(); i++) { if (filter_column[i] == 0) continue; @@ -1342,7 +1342,7 @@ void mergeNullAndFilterResult(Block & block, ColumnVector::Container & fi { const auto * other_filter_column = checkAndGetColumn>(orig_filter_column.get()); const auto & other_filter_column_data = static_cast *>(other_filter_column)->getData(); - for (size_t i = 0; i < other_filter_column->size(); ++i) + for (size_t i = 0; i < other_filter_column->size(); i++) filter_column[i] = filter_column[i] && other_filter_column_data[i]; } } @@ -1414,7 +1414,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr /// for (anti)leftSemi join, we should keep only one row for each original row of left table. /// and because it is semi join, we needn't save columns of right table, so we just keep the first replica. - for (size_t i = 0; i < offsets_to_replicate->size(); ++i) + for (size_t i = 0; i < offsets_to_replicate->size(); i++) { size_t prev_offset = i > 0 ? (*offsets_to_replicate)[i - 1] : 0; size_t current_offset = (*offsets_to_replicate)[i]; @@ -1448,7 +1448,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr match_nullmap_vec[i] = 1; } - for (size_t i = 0; i < block.columns(); ++i) + for (size_t i = 0; i < block.columns(); i++) if (i != helper_pos) block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1); block.safeGetByPosition(helper_pos).column = ColumnNullable::create(std::move(match_col), std::move(match_nullmap)); @@ -1467,12 +1467,12 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr if (isInnerJoin(kind) && original_strictness == ASTTableJoin::Strictness::All) { /// inner join, just use other_filter_column to filter result - for (size_t i = 0; i < block.columns(); ++i) + for (size_t i = 0; i < block.columns(); i++) block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, -1); return; } - for (size_t i = 0, prev_offset = 0; i < offsets_to_replicate->size(); ++i) + for (size_t i = 0, prev_offset = 0; i < offsets_to_replicate->size(); i++) { size_t current_offset = (*offsets_to_replicate)[i]; bool has_row_kept = false; @@ -1485,7 +1485,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr } else { - /// original strictness = ALL && kind = Anti should not happen + /// strictness = ALL && kind = Anti should not happens row_filter[index] = filter[index]; } if (row_filter[index]) @@ -1527,14 +1527,14 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr static_cast(*result_column).applyNegatedNullMap(*filter_column); column.column = std::move(result_column); } - for (size_t i = 0; i < block.columns(); ++i) + for (size_t i = 0; i < block.columns(); i++) block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1); return; } if (isInnerJoin(kind) || isAntiJoin(kind)) { /// for semi/anti join, filter out not matched rows - for (size_t i = 0; i < block.columns(); ++i) + for (size_t i = 0; i < block.columns(); i++) block.getByPosition(i).column = block.getByPosition(i).column->filter(row_filter, -1); return; } @@ -1606,7 +1606,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr added_columns.reserve(num_columns_to_add); std::vector right_table_column_indexes; - for (size_t i = 0; i < num_columns_to_add; ++i) + for (size_t i = 0; i < num_columns_to_add; i++) { right_table_column_indexes.push_back(i + existing_columns); } @@ -1616,12 +1616,15 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr for (size_t i = 0; i < num_columns_to_add; ++i) { - const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); - RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name); + const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.safeGetByPosition(i); - added_columns.push_back(src_column.column->cloneEmpty()); - added_columns.back()->reserve(src_column.column->size()); - right_indexes.push_back(num_columns_to_skip + i); + /// Don't insert column if it's in left block. + if (!block.has(src_column.name)) + { + added_columns.push_back(src_column.column->cloneEmpty()); + added_columns.back()->reserve(src_column.column->size()); + right_indexes.push_back(num_columns_to_skip + i); + } } size_t rows = block.rows(); @@ -1687,6 +1690,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps, ProbeProcessInfo & pr block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(*filter, -1); } + /// If ALL ... JOIN - we replicate all the columns except the new ones. if (offsets_to_replicate) { @@ -1862,7 +1866,6 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[ for (size_t i = 0; i < num_columns_to_add; ++i) { const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i); - RUNTIME_CHECK_MSG(!block.has(src_column.name), "block from probe side has a column with the same name: {} as a column in sample_block_with_columns_to_add", src_column.name); block.insert(src_column); } @@ -1882,22 +1885,22 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[ } std::vector right_column_index; - for (size_t i = 0; i < num_columns_to_add; ++i) + for (size_t i = 0; i < num_columns_to_add; i++) right_column_index.push_back(num_existing_columns + i); std::vector result_blocks; for (size_t start = 0; start <= rows_left; start += left_rows_per_iter) { size_t end = std::min(start + left_rows_per_iter, rows_left); - MutableColumns dst_columns(block.columns()); - for (size_t i = 0; i < block.columns(); ++i) + MutableColumns dst_columns(num_existing_columns + num_columns_to_add); + for (size_t i = 0; i < block.columns(); i++) { dst_columns[i] = block.getByPosition(i).column->cloneEmpty(); } IColumn::Offset current_offset = 0; std::unique_ptr is_row_matched = std::make_unique(end - start); std::unique_ptr expanded_row_size_after_join = std::make_unique(end - start); - for (size_t i = start; i < end; ++i) + for (size_t i = start; i < end; i++) { if constexpr (has_null_map) { @@ -1933,7 +1936,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[ { auto & sample_block = result_blocks[0]; MutableColumns dst_columns(sample_block.columns()); - for (size_t i = 0; i < sample_block.columns(); ++i) + for (size_t i = 0; i < sample_block.columns(); i++) { dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty(); } diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 5c5b89a52d3..0ddccbf6959 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -276,7 +276,7 @@ class Join std::unique_ptr> key_fixed_string; std::unique_ptr>> keys128; std::unique_ptr>> keys256; - std::unique_ptr> serialized; + std::unique_ptr> serialized; // TODO: add more cases like Aggregator };