Skip to content

Commit

Permalink
[Fix](multi-catalog) Fix not single slot filter conjuncts with dict f…
Browse files Browse the repository at this point in the history
…ilter issue. (apache#22052)

### Issue
Dictionary filtering is a mechanism that directly reads the dictionary encoding of a single string column filter condition for filter comparison. But dictionary filtered single string columns may be included in other multi-column filter conditions. This can cause problems.

For example:
`select * from multi_catalog.lineitem_string_date_orc where l_commitdate < l_receiptdate and l_receiptdate = '1995-01-01'  order by l_orderkey, l_partkey, l_suppkey, l_linenumber limit 10;`

`l_receiptdate` is string filter column,it is included by multi-column filter condition `l_commitdate < l_receiptdate`.

### Solution
Resolve it by separating the multi-column filter conditions and executing it after the dictionary filter column is converted to string.
  • Loading branch information
kaka11chen authored Jul 24, 2023
1 parent fc67929 commit 752cec9
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 22 deletions.
71 changes: 60 additions & 11 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,13 @@ Status OrcReader::init_reader(
_is_acid = is_acid;
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_text_converter.reset(new TextConverter('\\'));
if (not_single_slot_filter_conjuncts) {
_filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
_obj_pool = std::make_shared<ObjectPool>();
{
SCOPED_RAW_TIMER(&_statistics.create_reader_time);
Expand Down Expand Up @@ -1389,6 +1390,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}
}

std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
Expand All @@ -1407,8 +1409,24 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {

RETURN_IF_ERROR(_fill_partition_columns(block, rr, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, rr, _lazy_read_ctx.missing_columns));
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, *_filter));
Block::erase_useless_column(block, column_to_keep);

if (block->rows() == 0) {
*eof = true;
return Status::OK();
}

if (!_not_single_slot_filter_conjuncts.empty()) {
std::vector<IColumn::Filter*> filters;
filters.push_back(_filter.get());
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, &filters, block, columns_to_filter,
column_to_keep)));
} else {
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
Block::erase_useless_column(block, column_to_keep);
}
} else {
uint64_t rr;
SCOPED_RAW_TIMER(&_statistics.column_read_time);
Expand Down Expand Up @@ -1457,10 +1475,16 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
batch_vec[orc_col_idx->second], _batch->numElements));
}
*read_rows = rr;

RETURN_IF_ERROR(
_fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));

if (block->rows() == 0) {
*eof = true;
return Status::OK();
}

_build_delete_row_filter(block, rr);

std::vector<uint32_t> columns_to_filter;
Expand All @@ -1483,17 +1507,40 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_delete_rows_filter_ptr) {
filters.push_back(_delete_rows_filter_ptr.get());
}
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
filter_conjuncts, &filters, block, columns_to_filter, column_to_keep)));
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block, &batch_vec);
return Status::OK();
}
if (!_not_single_slot_filter_conjuncts.empty()) {
_convert_dict_cols_to_string_cols(block, &batch_vec);
std::vector<IColumn::Filter*> merged_filters;
merged_filters.push_back(&result_filter);
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, &merged_filters, block,
columns_to_filter, column_to_keep)));
} else {
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block, &batch_vec);
}
} else {
if (_delete_rows_filter_ptr) {
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
(*_delete_rows_filter_ptr)));
}
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block, &batch_vec);
}
_convert_dict_cols_to_string_cols(block, &batch_vec);
}
return Status::OK();
}
Expand Down Expand Up @@ -1581,8 +1628,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
_fill_partition_columns(block, size, _lazy_read_ctx.predicate_partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, size, _lazy_read_ctx.predicate_missing_columns));
if (_lazy_read_ctx.resize_first_column) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
block->get_by_position(0).column->assume_mutable()->resize(size);
_lazy_read_ctx.resize_first_column = true;
}

// transactional hive orc delete row
Expand All @@ -1608,6 +1656,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
filter_conjuncts, &filters, block, _filter.get(), &can_filter_all));

if (_lazy_read_ctx.resize_first_column) {
// We have to clean the first column to insert right data.
block->get_by_position(0).column->assume_mutable()->clear();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ class OrcReader : public GenericReader {

const TupleDescriptor* _tuple_descriptor;
const RowDescriptor* _row_descriptor;
VExprContextSPtrs _not_single_slot_filter_conjuncts;
const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts;
VExprContextSPtrs _dict_filter_conjuncts;
VExprContextSPtrs _non_dict_filter_conjuncts;
Expand Down
56 changes: 45 additions & 11 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ Status RowGroupReader::init(
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_col_name_to_slot_id = colname_to_slot_id;
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_text_converter.reset(new TextConverter('\\'));
if (not_single_slot_filter_conjuncts) {
_filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
_merge_read_ranges(row_ranges);
if (_read_columns.empty()) {
// Query task that only select columns in path.
Expand Down Expand Up @@ -325,12 +326,32 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
if (_position_delete_ctx.has_filter) {
filters.push_back(_pos_delete_filter_ptr.get());
}

RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_filter_conjuncts, &filters, block, columns_to_filter,
column_to_keep)));
_convert_dict_cols_to_string_cols(block);
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
_filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
return Status::OK();
}
if (!_not_single_slot_filter_conjuncts.empty()) {
_convert_dict_cols_to_string_cols(block);
std::vector<IColumn::Filter*> merged_filters;
merged_filters.push_back(&result_filter);
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, &merged_filters, block,
columns_to_filter, column_to_keep)));
} else {
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
}
} else {
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
Expand Down Expand Up @@ -404,7 +425,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
std::unique_ptr<ColumnSelectVector> select_vector_ptr = nullptr;
size_t pre_read_rows;
bool pre_eof;
std::vector<uint32_t> columns_to_filter;
size_t origin_column_num = block->columns();
columns_to_filter.resize(origin_column_num);
for (uint32_t i = 0; i < origin_column_num; ++i) {
columns_to_filter[i] = i;
}
IColumn::Filter result_filter;
while (true) {
// read predicate columns
Expand Down Expand Up @@ -538,7 +564,15 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re

*batch_eof = pre_eof;
RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
return _fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns);
RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
if (!_not_single_slot_filter_conjuncts.empty()) {
std::vector<IColumn::Filter*> filters;
filters.push_back(&result_filter);
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter,
origin_column_num)));
}
return Status::OK();
}

void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class RowGroupReader {
const TupleDescriptor* _tuple_descriptor;
const RowDescriptor* _row_descriptor;
const std::unordered_map<std::string, int>* _col_name_to_slot_id;
VExprContextSPtrs _not_single_slot_filter_conjuncts;
const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts;
VExprContextSPtrs _dict_filter_conjuncts;
VExprContextSPtrs _filter_conjuncts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,27 @@ Z6n2t4XA2n7CXTECJ,PE,iBbsCh0RE1Dd2A,z48
-- !pr21598 --
5

-- !not_single_slot_filter_conjuncts_orc --
\N 289572 4 1980215 480218.00 24.00 31082.88 0.05 0 R F 1994-12-14 1995-01-01 COLLECT COD AIR final accounts. instructions boost above
\N 388932 2 6038830 538843.00 46.00 81352.38 0.02 0.06 A F 1994-12-15 1995-01-01 NONE MAIL ven ideas are furiously according
\N 452964 3 14917531 167546.00 20.00 30955.80 0.02 0.03 R F 1994-12-03 1995-01-01 COLLECT COD AIR deposits. blithely even deposits a
\N 570084 4 14861731 361760.00 26.00 43991.74 0.05 0.08 A F 1994-11-03 1995-01-01 COLLECT COD MAIL ending hockey players wake f
\N 637092 4 15648780 148811.00 26.00 44928.00 0.06 0.04 R F 1994-11-14 1995-01-01 COLLECT COD SHIP lar deposits. as
\N 1084260 2 6109231 609244.00 10.00 12399.30 0.01 0.03 R F 1994-11-05 1995-01-01 DELIVER IN PERSON RAIL efully pending sentiments. epita
\N 1150884 1 13245123 245124.00 49.00 52305.54 0.05 0.02 R F 1994-12-22 1995-01-01 DELIVER IN PERSON REG AIR rious deposits about the quickly bold
\N 1578180 1 19168165 918223.00 10.00 12322.10 0.07 0.07 R F 1994-10-31 1995-01-01 COLLECT COD TRUCK ges. accounts sublate carefully
\N 2073732 2 13846443 596483.00 21.00 29163.75 0.10 0.08 R F 1994-12-06 1995-01-01 DELIVER IN PERSON FOB dolphins nag furiously q
\N 2479044 4 9763795 13805.00 40.00 74332.40 0.05 0.05 R F 1994-11-16 1995-01-01 COLLECT COD RAIL equests hinder qu

-- !not_single_slot_filter_conjuncts_parquet --
\N 289572 4 1980215 480218.00 24.00 31082.88 0.05 0 R F 1994-12-14 1995-01-01 COLLECT COD AIR final accounts. instructions boost above
\N 388932 2 6038830 538843.00 46.00 81352.38 0.02 0.06 A F 1994-12-15 1995-01-01 NONE MAIL ven ideas are furiously according
\N 452964 3 14917531 167546.00 20.00 30955.80 0.02 0.03 R F 1994-12-03 1995-01-01 COLLECT COD AIR deposits. blithely even deposits a
\N 570084 4 14861731 361760.00 26.00 43991.74 0.05 0.08 A F 1994-11-03 1995-01-01 COLLECT COD MAIL ending hockey players wake f
\N 637092 4 15648780 148811.00 26.00 44928.00 0.06 0.04 R F 1994-11-14 1995-01-01 COLLECT COD SHIP lar deposits. as
\N 1084260 2 6109231 609244.00 10.00 12399.30 0.01 0.03 R F 1994-11-05 1995-01-01 DELIVER IN PERSON RAIL efully pending sentiments. epita
\N 1150884 1 13245123 245124.00 49.00 52305.54 0.05 0.02 R F 1994-12-22 1995-01-01 DELIVER IN PERSON REG AIR rious deposits about the quickly bold
\N 1578180 1 19168165 918223.00 10.00 12322.10 0.07 0.07 R F 1994-10-31 1995-01-01 COLLECT COD TRUCK ges. accounts sublate carefully
\N 2073732 2 13846443 596483.00 21.00 29163.75 0.10 0.08 R F 1994-12-06 1995-01-01 DELIVER IN PERSON FOB dolphins nag furiously q
\N 2479044 4 9763795 13805.00 40.00 74332.40 0.05 0.05 R F 1994-11-16 1995-01-01 COLLECT COD RAIL equests hinder qu

Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ suite("test_external_catalog_hive", "p2") {
// test #21598
qt_pr21598 """select count(*) from( (SELECT r_regionkey AS key1, r_name AS name, pday AS pday FROM (SELECT r_regionkey, r_name, replace(r_comment, ' ', 'aaaa') AS pday FROM ${catalog_name}.tpch_1000_parquet.region) t2))x;"""

// test not_single_slot_filter_conjuncts with dict filter issue
qt_not_single_slot_filter_conjuncts_orc """ select * from multi_catalog.lineitem_string_date_orc where l_commitdate < l_receiptdate and l_receiptdate = '1995-01-01' order by l_orderkey, l_partkey, l_suppkey, l_linenumber limit 10; """
qt_not_single_slot_filter_conjuncts_parquet """ select * from multi_catalog.lineitem_string_date_orc where l_commitdate < l_receiptdate and l_receiptdate = '1995-01-01' order by l_orderkey, l_partkey, l_suppkey, l_linenumber limit 10; """

// test remember last used database after switch / rename catalog
sql """switch ${catalog_name};"""

Expand Down

0 comments on commit 752cec9

Please sign in to comment.