diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 001d32b821e2b1..e81f42a98e286c 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -253,12 +253,13 @@ Status OrcReader::init_reader( _is_acid = is_acid; _tuple_descriptor = tuple_descriptor; _row_descriptor = row_descriptor; + if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) { + _not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(), + not_single_slot_filter_conjuncts->begin(), + not_single_slot_filter_conjuncts->end()); + } _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; _text_converter.reset(new TextConverter('\\')); - if (not_single_slot_filter_conjuncts) { - _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(), - not_single_slot_filter_conjuncts->end()); - } _obj_pool = std::make_shared(); { SCOPED_RAW_TIMER(&_statistics.create_reader_time); @@ -1389,6 +1390,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { return Status::OK(); } } + std::vector batch_vec; _fill_batch_vec(batch_vec, _batch.get(), 0); for (auto& col_name : _lazy_read_ctx.lazy_read_columns) { @@ -1407,8 +1409,24 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_fill_partition_columns(block, rr, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, rr, _lazy_read_ctx.missing_columns)); - RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, *_filter)); - Block::erase_useless_column(block, column_to_keep); + + if (block->rows() == 0) { + *eof = true; + return Status::OK(); + } + + if (!_not_single_slot_filter_conjuncts.empty()) { + std::vector filters; + filters.push_back(_filter.get()); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + _not_single_slot_filter_conjuncts, &filters, block, columns_to_filter, + column_to_keep))); + } else { + RETURN_IF_CATCH_EXCEPTION( + Block::filter_block_internal(block, columns_to_filter, *_filter)); + Block::erase_useless_column(block, column_to_keep); + } } else { uint64_t rr; SCOPED_RAW_TIMER(&_statistics.column_read_time); @@ -1457,10 +1475,16 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { batch_vec[orc_col_idx->second], _batch->numElements)); } *read_rows = rr; + RETURN_IF_ERROR( _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); + if (block->rows() == 0) { + *eof = true; + return Status::OK(); + } + _build_delete_row_filter(block, rr); std::vector columns_to_filter; @@ -1483,17 +1507,40 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { if (_delete_rows_filter_ptr) { filters.push_back(_delete_rows_filter_ptr.get()); } - RETURN_IF_CATCH_EXCEPTION( - RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - filter_conjuncts, &filters, block, columns_to_filter, column_to_keep))); + IColumn::Filter result_filter(block->rows(), 1); + bool can_filter_all = false; + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts( + filter_conjuncts, &filters, block, &result_filter, &can_filter_all)); + if (can_filter_all) { + for (auto& col : columns_to_filter) { + std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + } + Block::erase_useless_column(block, column_to_keep); + _convert_dict_cols_to_string_cols(block, &batch_vec); + return Status::OK(); + } + if (!_not_single_slot_filter_conjuncts.empty()) { + _convert_dict_cols_to_string_cols(block, &batch_vec); + std::vector merged_filters; + merged_filters.push_back(&result_filter); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + _not_single_slot_filter_conjuncts, &merged_filters, block, + columns_to_filter, column_to_keep))); + } else { + RETURN_IF_CATCH_EXCEPTION( + Block::filter_block_internal(block, columns_to_filter, result_filter)); + Block::erase_useless_column(block, column_to_keep); + _convert_dict_cols_to_string_cols(block, &batch_vec); + } } else { if (_delete_rows_filter_ptr) { RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, (*_delete_rows_filter_ptr))); } Block::erase_useless_column(block, column_to_keep); + _convert_dict_cols_to_string_cols(block, &batch_vec); } - _convert_dict_cols_to_string_cols(block, &batch_vec); } return Status::OK(); } @@ -1581,8 +1628,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s _fill_partition_columns(block, size, _lazy_read_ctx.predicate_partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, size, _lazy_read_ctx.predicate_missing_columns)); if (_lazy_read_ctx.resize_first_column) { + // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 + // The following process may be tricky and time-consuming, but we have no other way. block->get_by_position(0).column->assume_mutable()->resize(size); - _lazy_read_ctx.resize_first_column = true; } // transactional hive orc delete row @@ -1608,6 +1656,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s filter_conjuncts, &filters, block, _filter.get(), &can_filter_all)); if (_lazy_read_ctx.resize_first_column) { + // We have to clean the first column to insert right data. block->get_by_position(0).column->assume_mutable()->clear(); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index d05fb4c4782269..b558f064658f01 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -539,6 +539,7 @@ class OrcReader : public GenericReader { const TupleDescriptor* _tuple_descriptor; const RowDescriptor* _row_descriptor; + VExprContextSPtrs _not_single_slot_filter_conjuncts; const std::unordered_map* _slot_id_to_filter_conjuncts; VExprContextSPtrs _dict_filter_conjuncts; VExprContextSPtrs _non_dict_filter_conjuncts; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 6faaab01772d26..3ef3581e3658fd 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -110,12 +110,13 @@ Status RowGroupReader::init( _tuple_descriptor = tuple_descriptor; _row_descriptor = row_descriptor; _col_name_to_slot_id = colname_to_slot_id; + if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) { + _not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(), + not_single_slot_filter_conjuncts->begin(), + not_single_slot_filter_conjuncts->end()); + } _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; _text_converter.reset(new TextConverter('\\')); - if (not_single_slot_filter_conjuncts) { - _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(), - not_single_slot_filter_conjuncts->end()); - } _merge_read_ranges(row_ranges); if (_read_columns.empty()) { // Query task that only select columns in path. @@ -325,12 +326,32 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ if (_position_delete_ctx.has_filter) { filters.push_back(_pos_delete_filter_ptr.get()); } - - RETURN_IF_CATCH_EXCEPTION( - RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - _filter_conjuncts, &filters, block, columns_to_filter, - column_to_keep))); - _convert_dict_cols_to_string_cols(block); + IColumn::Filter result_filter(block->rows(), 1); + bool can_filter_all = false; + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts( + _filter_conjuncts, &filters, block, &result_filter, &can_filter_all)); + if (can_filter_all) { + for (auto& col : columns_to_filter) { + std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + } + Block::erase_useless_column(block, column_to_keep); + _convert_dict_cols_to_string_cols(block); + return Status::OK(); + } + if (!_not_single_slot_filter_conjuncts.empty()) { + _convert_dict_cols_to_string_cols(block); + std::vector merged_filters; + merged_filters.push_back(&result_filter); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + _not_single_slot_filter_conjuncts, &merged_filters, block, + columns_to_filter, column_to_keep))); + } else { + RETURN_IF_CATCH_EXCEPTION( + Block::filter_block_internal(block, columns_to_filter, result_filter)); + Block::erase_useless_column(block, column_to_keep); + _convert_dict_cols_to_string_cols(block); + } } else { RETURN_IF_CATCH_EXCEPTION( RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter))); @@ -404,7 +425,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re std::unique_ptr select_vector_ptr = nullptr; size_t pre_read_rows; bool pre_eof; + std::vector columns_to_filter; size_t origin_column_num = block->columns(); + columns_to_filter.resize(origin_column_num); + for (uint32_t i = 0; i < origin_column_num; ++i) { + columns_to_filter[i] = i; + } IColumn::Filter result_filter; while (true) { // read predicate columns @@ -538,7 +564,15 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re *batch_eof = pre_eof; RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns)); - return _fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns); + RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns)); + if (!_not_single_slot_filter_conjuncts.empty()) { + std::vector filters; + filters.push_back(&result_filter); + RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, + origin_column_num))); + } + return Status::OK(); } void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 393f7388574929..ef35f5adf21819 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -209,6 +209,7 @@ class RowGroupReader { const TupleDescriptor* _tuple_descriptor; const RowDescriptor* _row_descriptor; const std::unordered_map* _col_name_to_slot_id; + VExprContextSPtrs _not_single_slot_filter_conjuncts; const std::unordered_map* _slot_id_to_filter_conjuncts; VExprContextSPtrs _dict_filter_conjuncts; VExprContextSPtrs _filter_conjuncts; diff --git a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive.out b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive.out index 54dc2d9a403ddd..ae29339cec32ac 100644 --- a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive.out +++ b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive.out @@ -96,3 +96,27 @@ Z6n2t4XA2n7CXTECJ,PE,iBbsCh0RE1Dd2A,z48 -- !pr21598 -- 5 +-- !not_single_slot_filter_conjuncts_orc -- +\N 289572 4 1980215 480218.00 24.00 31082.88 0.05 0 R F 1994-12-14 1995-01-01 COLLECT COD AIR final accounts. instructions boost above +\N 388932 2 6038830 538843.00 46.00 81352.38 0.02 0.06 A F 1994-12-15 1995-01-01 NONE MAIL ven ideas are furiously according +\N 452964 3 14917531 167546.00 20.00 30955.80 0.02 0.03 R F 1994-12-03 1995-01-01 COLLECT COD AIR deposits. blithely even deposits a +\N 570084 4 14861731 361760.00 26.00 43991.74 0.05 0.08 A F 1994-11-03 1995-01-01 COLLECT COD MAIL ending hockey players wake f +\N 637092 4 15648780 148811.00 26.00 44928.00 0.06 0.04 R F 1994-11-14 1995-01-01 COLLECT COD SHIP lar deposits. as +\N 1084260 2 6109231 609244.00 10.00 12399.30 0.01 0.03 R F 1994-11-05 1995-01-01 DELIVER IN PERSON RAIL efully pending sentiments. epita +\N 1150884 1 13245123 245124.00 49.00 52305.54 0.05 0.02 R F 1994-12-22 1995-01-01 DELIVER IN PERSON REG AIR rious deposits about the quickly bold +\N 1578180 1 19168165 918223.00 10.00 12322.10 0.07 0.07 R F 1994-10-31 1995-01-01 COLLECT COD TRUCK ges. accounts sublate carefully +\N 2073732 2 13846443 596483.00 21.00 29163.75 0.10 0.08 R F 1994-12-06 1995-01-01 DELIVER IN PERSON FOB dolphins nag furiously q +\N 2479044 4 9763795 13805.00 40.00 74332.40 0.05 0.05 R F 1994-11-16 1995-01-01 COLLECT COD RAIL equests hinder qu + +-- !not_single_slot_filter_conjuncts_parquet -- +\N 289572 4 1980215 480218.00 24.00 31082.88 0.05 0 R F 1994-12-14 1995-01-01 COLLECT COD AIR final accounts. instructions boost above +\N 388932 2 6038830 538843.00 46.00 81352.38 0.02 0.06 A F 1994-12-15 1995-01-01 NONE MAIL ven ideas are furiously according +\N 452964 3 14917531 167546.00 20.00 30955.80 0.02 0.03 R F 1994-12-03 1995-01-01 COLLECT COD AIR deposits. blithely even deposits a +\N 570084 4 14861731 361760.00 26.00 43991.74 0.05 0.08 A F 1994-11-03 1995-01-01 COLLECT COD MAIL ending hockey players wake f +\N 637092 4 15648780 148811.00 26.00 44928.00 0.06 0.04 R F 1994-11-14 1995-01-01 COLLECT COD SHIP lar deposits. as +\N 1084260 2 6109231 609244.00 10.00 12399.30 0.01 0.03 R F 1994-11-05 1995-01-01 DELIVER IN PERSON RAIL efully pending sentiments. epita +\N 1150884 1 13245123 245124.00 49.00 52305.54 0.05 0.02 R F 1994-12-22 1995-01-01 DELIVER IN PERSON REG AIR rious deposits about the quickly bold +\N 1578180 1 19168165 918223.00 10.00 12322.10 0.07 0.07 R F 1994-10-31 1995-01-01 COLLECT COD TRUCK ges. accounts sublate carefully +\N 2073732 2 13846443 596483.00 21.00 29163.75 0.10 0.08 R F 1994-12-06 1995-01-01 DELIVER IN PERSON FOB dolphins nag furiously q +\N 2479044 4 9763795 13805.00 40.00 74332.40 0.05 0.05 R F 1994-11-16 1995-01-01 COLLECT COD RAIL equests hinder qu + diff --git a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive.groovy b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive.groovy index df06c7e246cf98..e0a56e89c689f8 100644 --- a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive.groovy +++ b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive.groovy @@ -89,6 +89,10 @@ suite("test_external_catalog_hive", "p2") { // test #21598 qt_pr21598 """select count(*) from( (SELECT r_regionkey AS key1, r_name AS name, pday AS pday FROM (SELECT r_regionkey, r_name, replace(r_comment, ' ', 'aaaa') AS pday FROM ${catalog_name}.tpch_1000_parquet.region) t2))x;""" + // test not_single_slot_filter_conjuncts with dict filter issue + qt_not_single_slot_filter_conjuncts_orc """ select * from multi_catalog.lineitem_string_date_orc where l_commitdate < l_receiptdate and l_receiptdate = '1995-01-01' order by l_orderkey, l_partkey, l_suppkey, l_linenumber limit 10; """ + qt_not_single_slot_filter_conjuncts_parquet """ select * from multi_catalog.lineitem_string_date_orc where l_commitdate < l_receiptdate and l_receiptdate = '1995-01-01' order by l_orderkey, l_partkey, l_suppkey, l_linenumber limit 10; """ + // test remember last used database after switch / rename catalog sql """switch ${catalog_name};"""