Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Improve parquet zonemap filter in row group level #52924

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,7 @@ CONF_mBool(parquet_coalesce_read_enable, "true");
CONF_Bool(parquet_late_materialization_enable, "true");
CONF_Bool(parquet_page_index_enable, "true");
CONF_mBool(parquet_statistics_process_more_filter_enable, "true");
CONF_mBool(parquet_advance_zonemap_filter, "true");

CONF_Int32(io_coalesce_read_max_buffer_size, "8388608");
CONF_Int32(io_coalesce_read_max_distance_size, "1048576");
Expand Down
19 changes: 19 additions & 0 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ Status HiveDataSource::_init_conjunct_ctxs(RuntimeState* state) {
_update_has_any_predicate();

RETURN_IF_ERROR(_decompose_conjunct_ctxs(state));
RETURN_IF_ERROR(_setup_all_conjunct_ctxs(state));
return Status::OK();
}

Expand Down Expand Up @@ -446,6 +447,23 @@ Status HiveDataSource::_decompose_conjunct_ctxs(RuntimeState* state) {
return Status::OK();
}

Status HiveDataSource::_setup_all_conjunct_ctxs(RuntimeState* state) {
// clone conjunct from _min_max_conjunct_ctxs & _conjunct_ctxs
// then we will generate PredicateTree based on _all_conjunct_ctxs
std::vector<ExprContext*> cloned_conjunct_ctxs;
RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _min_max_conjunct_ctxs, &cloned_conjunct_ctxs));
for (auto* ctx : cloned_conjunct_ctxs) {
_all_conjunct_ctxs.emplace_back(ctx);
}

cloned_conjunct_ctxs.clear();
RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _conjunct_ctxs, &cloned_conjunct_ctxs));
for (auto* ctx : cloned_conjunct_ctxs) {
_all_conjunct_ctxs.emplace_back(ctx);
}
return Status::OK();
}

void HiveDataSource::_init_counter(RuntimeState* state) {
const auto& hdfs_scan_node = _provider->_hdfs_scan_node;

Expand Down Expand Up @@ -637,6 +655,7 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) {

scanner_params.can_use_any_column = _can_use_any_column;
scanner_params.can_use_min_max_count_opt = _can_use_min_max_count_opt;
scanner_params.all_conjunct_ctxs = _all_conjunct_ctxs;

HdfsScanner* scanner = nullptr;
auto format = scan_range.file_format;
Expand Down
3 changes: 3 additions & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class HiveDataSource final : public DataSource {
Status _init_conjunct_ctxs(RuntimeState* state);
void _update_has_any_predicate();
Status _decompose_conjunct_ctxs(RuntimeState* state);
Status _setup_all_conjunct_ctxs(RuntimeState* state);
void _init_tuples_and_slots(RuntimeState* state);
void _init_counter(RuntimeState* state);
void _init_rf_counters();
Expand Down Expand Up @@ -119,6 +120,8 @@ class HiveDataSource final : public DataSource {
// ============ conjuncts =================
std::vector<ExprContext*> _min_max_conjunct_ctxs;

// contains whole conjuncts, used to generate PredicateTree
std::vector<ExprContext*> _all_conjunct_ctxs{};
// complex conjuncts, such as contains multi slot, are evaled in scanner.
std::vector<ExprContext*> _scanner_conjunct_ctxs;
// conjuncts that contains only one slot.
Expand Down
2 changes: 1 addition & 1 deletion be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ Status LakeDataSource::init_reader_params(const std::vector<OlapScanRange*>& key
std::vector<uint32_t>& reader_columns) {
const TLakeScanNode& thrift_lake_scan_node = _provider->_t_lake_scan_node;
bool skip_aggregation = thrift_lake_scan_node.is_preaggregation;
auto parser = _obj_pool.add(new PredicateParser(_tablet_schema));
auto parser = _obj_pool.add(new OlapPredicateParser(_tablet_schema));
_params.is_pipeline = true;
_params.reader_type = READER_QUERY;
_params.skip_aggregation = skip_aggregation;
Expand Down
19 changes: 19 additions & 0 deletions be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "io/cache_select_input_stream.hpp"
#include "io/compressed_input_stream.h"
#include "io/shared_buffered_input_stream.h"
#include "pipeline/fragment_context.h"
#include "storage/predicate_parser.h"
#include "util/compression/compression_utils.h"
#include "util/compression/stream_compression.h"

Expand Down Expand Up @@ -168,6 +170,23 @@ Status HdfsScanner::_build_scanner_context() {
ctx.split_context = _scanner_params.split_context;
ctx.enable_split_tasks = _scanner_params.enable_split_tasks;
ctx.connector_max_split_size = _scanner_params.connector_max_split_size;

if (config::parquet_advance_zonemap_filter) {
OlapScanConjunctsManagerOptions opts;
opts.conjunct_ctxs_ptr = &_scanner_params.all_conjunct_ctxs;
opts.tuple_desc = _scanner_params.tuple_desc;
opts.obj_pool = _runtime_state->obj_pool();
opts.runtime_filters = _scanner_params.runtime_filter_collector;
opts.runtime_state = _runtime_state;
opts.enable_column_expr_predicate = true;
opts.is_olap_scan = false;
opts.pred_tree_params = _runtime_state->fragment_ctx()->pred_tree_params();
ctx.conjuncts_manager = std::make_unique<OlapScanConjunctsManager>(std::move(opts));
RETURN_IF_ERROR(ctx.conjuncts_manager->parse_conjuncts());
ConnectorPredicateParser predicate_parser{&ctx.slot_descs};
ASSIGN_OR_RETURN(ctx.predicate_tree,
ctx.conjuncts_manager->get_predicate_tree(&predicate_parser, ctx.predicate_free_pool));
}
return Status::OK();
}

Expand Down
9 changes: 9 additions & 0 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <atomic>
#include <boost/algorithm/string.hpp>

#include "exec/olap_scan_prepare.h"
#include "exec/pipeline/scan/morsel.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
Expand Down Expand Up @@ -82,6 +83,8 @@ struct HdfsScanStats {
// page index
int64_t rows_before_page_index = 0;
int64_t page_index_ns = 0;
int64_t parquet_total_row_groups = 0;
int64_t parquet_filtered_row_groups = 0;

// late materialize round-by-round
int64_t group_min_round_cost = 0;
Expand Down Expand Up @@ -159,6 +162,7 @@ struct HdfsScannerParams {
// runtime bloom filter.
const RuntimeFilterProbeCollector* runtime_filter_collector = nullptr;

std::vector<ExprContext*> all_conjunct_ctxs;
// all conjuncts except `conjunct_ctxs_by_slot`, like compound predicates
std::vector<ExprContext*> scanner_conjunct_ctxs;
std::unordered_set<SlotId> slots_in_conjunct;
Expand Down Expand Up @@ -338,6 +342,11 @@ struct HdfsScannerContext {
Status evaluate_on_conjunct_ctxs_by_slot(ChunkPtr* chunk, Filter* filter);

void merge_split_tasks();

// used for parquet zone map filter only
std::unique_ptr<OlapScanConjunctsManager> conjuncts_manager = nullptr;
std::vector<std::unique_ptr<ColumnPredicate>> predicate_free_pool;
PredicateTree predicate_tree;
};

struct OpenFileOptions {
Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/hdfs_scanner_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "exec/iceberg/iceberg_delete_builder.h"
#include "exec/paimon/paimon_delete_file_builder.h"
#include "formats/parquet/file_reader.h"
#include "pipeline/fragment_context.h"
#include "util/runtime_profile.h"

namespace starrocks {
Expand Down Expand Up @@ -88,6 +89,9 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) {
// page index
RuntimeProfile::Counter* rows_before_page_index = nullptr;
RuntimeProfile::Counter* page_index_timer = nullptr;
// filter stats
RuntimeProfile::Counter* total_row_groups = nullptr;
RuntimeProfile::Counter* filtered_row_groups = nullptr;

RuntimeProfile* root = profile->runtime_profile;
ADD_COUNTER(root, kParquetProfileSectionPrefix, TUnit::NONE);
Expand Down Expand Up @@ -128,6 +132,8 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) {
kParquetProfileSectionPrefix);
rows_before_page_index = ADD_CHILD_COUNTER(root, "RowsBeforePageIndex", TUnit::UNIT, kParquetProfileSectionPrefix);
page_index_timer = ADD_CHILD_TIMER(root, "PageIndexTime", kParquetProfileSectionPrefix);
total_row_groups = ADD_CHILD_COUNTER(root, "TotalRowGroups", TUnit::UNIT, kParquetProfileSectionPrefix);
filtered_row_groups = ADD_CHILD_COUNTER(root, "FilteredRowGroups", TUnit::UNIT, kParquetProfileSectionPrefix);

COUNTER_UPDATE(request_bytes_read, _app_stats.request_bytes_read);
COUNTER_UPDATE(request_bytes_read_uncompressed, _app_stats.request_bytes_read_uncompressed);
Expand All @@ -153,6 +159,12 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) {
do_update_iceberg_v2_counter(root, kParquetProfileSectionPrefix);
COUNTER_UPDATE(rows_before_page_index, _app_stats.rows_before_page_index);
COUNTER_UPDATE(page_index_timer, _app_stats.page_index_ns);
COUNTER_UPDATE(total_row_groups, _app_stats.parquet_total_row_groups);
COUNTER_UPDATE(filtered_row_groups, _app_stats.parquet_filtered_row_groups);
if (_scanner_ctx.conjuncts_manager != nullptr &&
_runtime_state->fragment_ctx()->pred_tree_params().enable_show_in_profile) {
root->add_info_string("ParquetPredicateTreeFilter", _scanner_ctx.predicate_tree.root().debug_string());
}
}

Status HdfsParquetScanner::do_open(RuntimeState* runtime_state) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ StatusOr<bool> ChunkPredicateBuilder<E, Type>::parse_conjuncts() {
RETURN_IF_ERROR(build_olap_filters());

// Only the root builder builds scan keys.
if (_is_root_builder) {
if (_is_root_builder && _opts.is_olap_scan) {
RETURN_IF_ERROR(build_scan_keys(_opts.scan_keys_unlimited, _opts.max_scan_key_num));
}

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/olap_scan_prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct OlapScanConjunctsManagerOptions {
bool scan_keys_unlimited = true;
int32_t max_scan_key_num = 1024;
bool enable_column_expr_predicate = false;
bool is_olap_scan = true;

PredicateTreeParams pred_tree_params;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Status OlapChunkSource::_init_reader_params(const std::vector<std::unique_ptr<Ol
std::vector<uint32_t>& reader_columns) {
const TOlapScanNode& thrift_olap_scan_node = _scan_node->thrift_olap_scan_node();
bool skip_aggregation = thrift_olap_scan_node.is_preaggregation;
auto parser = _obj_pool.add(new PredicateParser(_tablet_schema));
auto parser = _obj_pool.add(new OlapPredicateParser(_tablet_schema));
_params.is_pipeline = true;
_params.reader_type = READER_QUERY;
_params.skip_aggregation = skip_aggregation;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/tablet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Status TabletScanner::_init_reader_params(const std::vector<OlapScanRange*>* key
// to avoid the unnecessary SerDe and improve query performance
_params.need_agg_finalize = _need_agg_finalize;
_params.use_page_cache = _runtime_state->use_page_cache();
auto parser = _pool.add(new PredicateParser(_tablet_schema));
auto parser = _pool.add(new OlapPredicateParser(_tablet_schema));

ASSIGN_OR_RETURN(auto pred_tree, _parent->_conjuncts_manager->get_predicate_tree(parser, _predicate_free_pool));

Expand Down
1 change: 1 addition & 0 deletions be/src/formats/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ add_library(Formats STATIC
parquet/column_chunk_writer.cpp
parquet/column_read_order_ctx.cpp
parquet/statistics_helper.cpp
parquet/zone_map_filter_evaluator.h
disk_range.hpp
)

Expand Down
11 changes: 11 additions & 0 deletions be/src/formats/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "common/statusor.h"
#include "formats/parquet/metadata.h"
#include "formats/parquet/types.h"
#include "formats/parquet/utils.h"
#include "io/shared_buffered_input_stream.h"
#include "storage/column_predicate.h"
#include "storage/predicate_tree/predicate_tree_fwd.h"
#include "storage/range.h"
#include "types/logical_type.h"

Expand Down Expand Up @@ -63,6 +65,7 @@ struct ColumnReaderOptions {
RandomAccessFile* file = nullptr;
const tparquet::RowGroup* row_group_meta = nullptr;
uint64_t first_row_index = 0;
const FileMetaData* file_meta_data = nullptr;
};

class StoredColumnReader;
Expand Down Expand Up @@ -150,6 +153,14 @@ class ColumnReader {

virtual void select_offset_index(const SparseRange<uint64_t>& range, const uint64_t rg_first_row) = 0;

virtual Status row_group_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges, CompoundNodeType pred_relation,
const uint64_t rg_first_row, const uint64_t rg_num_rows) const {
// not implemented, means select the whole row group
row_ranges->add({rg_first_row, rg_first_row + rg_num_rows});
return Status::OK();
}

private:
// _parquet_field is generated by parquet format, so ParquetField's children order may different from ColumnReader's children.
const ParquetField* _parquet_field = nullptr;
Expand Down
78 changes: 32 additions & 46 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "formats/parquet/schema.h"
#include "formats/parquet/statistics_helper.h"
#include "formats/parquet/utils.h"
#include "formats/parquet/zone_map_filter_evaluator.h"
#include "fs/fs.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/parquet_types.h"
Expand Down Expand Up @@ -305,7 +306,8 @@ bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_read
LOG(WARNING) << "Can't get " + slot->col_name() + "'s ParquetField in _read_min_max_chunk.";
continue;
}
auto st = _get_min_max_value(slot, column_meta, field, min_values, max_values);
auto st = StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta,
field, min_values, max_values);
if (!st.ok()) continue;
Filter selected(min_values.size(), 1);
st = StatisticsHelper::in_filter_on_min_max_stat(min_values, max_values, ctx, field,
Expand All @@ -324,19 +326,33 @@ bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_read
// when doing row group filter, there maybe some error, but we'd better just ignore it instead of returning the error
// status and lead to the query failed.
bool FileReader::_filter_group(const GroupReaderPtr& group_reader) {
if (_filter_group_with_min_max_conjuncts(group_reader)) {
return true;
}
if (config::parquet_advance_zonemap_filter) {
auto res = _scanner_ctx->predicate_tree.visit(
ZoneMapEvaluator<FilterLevel::ROW_GROUP>{_scanner_ctx->predicate_tree, group_reader.get()});
if (!res.ok()) {
LOG(WARNING) << "filter row group failed: " << res.status().message();
return false;
}
if (res.value().has_value() && res.value()->empty()) {
// no rows selected, the whole row group can be filtered
return true;
}
return false;
} else {
if (_filter_group_with_min_max_conjuncts(group_reader)) {
return true;
}

if (_filter_group_with_bloom_filter_min_max_conjuncts(group_reader)) {
return true;
}
if (_filter_group_with_bloom_filter_min_max_conjuncts(group_reader)) {
return true;
}

if (config::parquet_statistics_process_more_filter_enable && _filter_group_with_more_filter(group_reader)) {
return true;
}
if (config::parquet_statistics_process_more_filter_enable && _filter_group_with_more_filter(group_reader)) {
return true;
}

return false;
return false;
}
}

Status FileReader::_read_min_max_chunk(const GroupReaderPtr& group_reader, const std::vector<SlotDescriptor*>& slots,
Expand Down Expand Up @@ -381,7 +397,8 @@ Status FileReader::_read_min_max_chunk(const GroupReaderPtr& group_reader, const
return Status::InternalError(strings::Substitute("Can't get $0 field", slot->col_name()));
}

RETURN_IF_ERROR(_get_min_max_value(slot, column_meta, field, min_values, max_values));
RETURN_IF_ERROR(StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta, field,
min_values, max_values));
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column((*min_chunk)->columns()[i], min_values,
slot->type(), field, ctx.timezone));
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column((*max_chunk)->columns()[i], max_values,
Expand All @@ -401,40 +418,6 @@ int32_t FileReader::_get_partition_column_idx(const std::string& col_name) const
return -1;
}

Status FileReader::_get_min_max_value(const SlotDescriptor* slot, const tparquet::ColumnMetaData* column_meta,
const ParquetField* field, std::vector<std::string>& min_values,
std::vector<std::string>& max_values) const {
// When statistics is empty, column_meta->__isset.statistics is still true,
// but statistics.__isset.xxx may be false, so judgment is required here.
bool is_set_min_max = (column_meta->statistics.__isset.max && column_meta->statistics.__isset.min) ||
(column_meta->statistics.__isset.max_value && column_meta->statistics.__isset.min_value);
if (!is_set_min_max) {
return Status::Aborted("No exist min/max");
}

DCHECK_EQ(field->physical_type, column_meta->type);
auto sort_order = sort_order_of_logical_type(slot->type().type);

if (!_has_correct_min_max_stats(*column_meta, sort_order)) {
return Status::Aborted("The file has incorrect order");
}

if (column_meta->statistics.__isset.min_value) {
min_values.emplace_back(column_meta->statistics.min_value);
max_values.emplace_back(column_meta->statistics.max_value);
} else {
min_values.emplace_back(column_meta->statistics.min);
max_values.emplace_back(column_meta->statistics.max);
}

return Status::OK();
}

bool FileReader::_has_correct_min_max_stats(const tparquet::ColumnMetaData& column_meta,
const SortOrder& sort_order) const {
return _file_metadata->writer_version().HasCorrectStatistics(column_meta, sort_order);
}

void FileReader::_prepare_read_columns(std::unordered_set<std::string>& existed_column_names) {
_meta_helper->prepare_read_columns(_scanner_ctx->materialized_columns, _group_reader_param.read_cols,
existed_column_names);
Expand Down Expand Up @@ -483,9 +466,12 @@ Status FileReader::_init_group_readers() {
std::make_shared<GroupReader>(_group_reader_param, i, _need_skip_rowids, row_group_first_row);
RETURN_IF_ERROR(row_group_reader->init());

_group_reader_param.stats->parquet_total_row_groups += 1;

// You should call row_group_reader->init() before _filter_group()
if (_filter_group(row_group_reader)) {
DLOG(INFO) << "row group " << i << " of file has been filtered by min/max conjunct";
_group_reader_param.stats->parquet_filtered_row_groups += 1;
continue;
}

Expand Down
Loading
Loading