From 62ba77c613bb38c778a5e6228d034737aa09e325 Mon Sep 17 00:00:00 2001 From: Smith Cruise Date: Thu, 14 Nov 2024 20:42:24 +0800 Subject: [PATCH 1/7] init Signed-off-by: Smith Cruise --- be/src/common/config.h | 1 + be/src/connector/hive_connector.cpp | 14 ++ be/src/connector/hive_connector.h | 2 + be/src/connector/lake_connector.cpp | 2 +- be/src/exec/hdfs_scanner.cpp | 19 ++- be/src/exec/hdfs_scanner.h | 11 ++ be/src/exec/hdfs_scanner_parquet.cpp | 10 ++ be/src/exec/olap_scan_prepare.cpp | 2 +- be/src/exec/olap_scan_prepare.h | 1 + .../exec/pipeline/scan/olap_chunk_source.cpp | 2 +- be/src/exec/tablet_scanner.cpp | 2 +- be/src/formats/CMakeLists.txt | 1 + be/src/formats/parquet/column_reader.h | 11 ++ be/src/formats/parquet/file_reader.cpp | 37 +++-- be/src/formats/parquet/file_reader.h | 2 + be/src/formats/parquet/group_reader.cpp | 9 ++ be/src/formats/parquet/group_reader.h | 2 + .../formats/parquet/scalar_column_reader.cpp | 67 +++++++++ be/src/formats/parquet/scalar_column_reader.h | 4 + be/src/formats/parquet/statistics_helper.cpp | 35 +++++ be/src/formats/parquet/statistics_helper.h | 10 +- .../parquet/zone_map_filter_evaluator.h | 80 ++++++++++ be/src/storage/lake/tablet_reader.cpp | 2 +- be/src/storage/predicate_parser.cpp | 138 +++++++++++++----- be/src/storage/predicate_parser.h | 64 +++++++- be/src/storage/tablet_reader.cpp | 2 +- 26 files changed, 469 insertions(+), 61 deletions(-) create mode 100644 be/src/formats/parquet/zone_map_filter_evaluator.h diff --git a/be/src/common/config.h b/be/src/common/config.h index 1f2a5468b5ba5..5e6575d33b05c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -909,6 +909,7 @@ CONF_mBool(parquet_coalesce_read_enable, "true"); CONF_Bool(parquet_late_materialization_enable, "true"); CONF_Bool(parquet_page_index_enable, "true"); CONF_mBool(parquet_statistics_process_more_filter_enable, "true"); +CONF_mBool(parquet_advance_zonemap_filter, "true"); CONF_Int32(io_coalesce_read_max_buffer_size, "8388608"); CONF_Int32(io_coalesce_read_max_distance_size, "1048576"); diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index 117ae3be18aed..490d1369b5e1a 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -218,6 +218,19 @@ Status HiveDataSource::_init_conjunct_ctxs(RuntimeState* state) { _update_has_any_predicate(); RETURN_IF_ERROR(_decompose_conjunct_ctxs(state)); + { + std::vector cloned_conjunct_ctxs; + RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _min_max_conjunct_ctxs, &cloned_conjunct_ctxs)); + for (auto* ctx : cloned_conjunct_ctxs) { + _all_conjunct_ctxs.emplace_back(ctx); + } + + cloned_conjunct_ctxs.clear(); + RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _conjunct_ctxs, &cloned_conjunct_ctxs)); + for (auto* ctx : cloned_conjunct_ctxs) { + _all_conjunct_ctxs.emplace_back(ctx); + } + } return Status::OK(); } @@ -637,6 +650,7 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) { scanner_params.can_use_any_column = _can_use_any_column; scanner_params.can_use_min_max_count_opt = _can_use_min_max_count_opt; + scanner_params.all_conjunct_ctxs = _all_conjunct_ctxs; HdfsScanner* scanner = nullptr; auto format = scan_range.file_format; diff --git a/be/src/connector/hive_connector.h b/be/src/connector/hive_connector.h index 79b41b9d39e24..f2aa2e6d181a3 100644 --- a/be/src/connector/hive_connector.h +++ b/be/src/connector/hive_connector.h @@ -119,6 +119,8 @@ class HiveDataSource final : public DataSource { // ============ conjuncts ================= std::vector _min_max_conjunct_ctxs; + // whole conjuncts, used to generate PredicateTree + std::vector _all_conjunct_ctxs{}; // complex conjuncts, such as contains multi slot, are evaled in scanner. std::vector _scanner_conjunct_ctxs; // conjuncts that contains only one slot. diff --git a/be/src/connector/lake_connector.cpp b/be/src/connector/lake_connector.cpp index c4670c124eaa3..3dc4bec7354e9 100644 --- a/be/src/connector/lake_connector.cpp +++ b/be/src/connector/lake_connector.cpp @@ -250,7 +250,7 @@ Status LakeDataSource::init_reader_params(const std::vector& key std::vector& reader_columns) { const TLakeScanNode& thrift_lake_scan_node = _provider->_t_lake_scan_node; bool skip_aggregation = thrift_lake_scan_node.is_preaggregation; - auto parser = _obj_pool.add(new PredicateParser(_tablet_schema)); + auto parser = _obj_pool.add(new OlapPredicateParser(_tablet_schema)); _params.is_pipeline = true; _params.reader_type = READER_QUERY; _params.skip_aggregation = skip_aggregation; diff --git a/be/src/exec/hdfs_scanner.cpp b/be/src/exec/hdfs_scanner.cpp index 474c776424868..0833634417c0d 100644 --- a/be/src/exec/hdfs_scanner.cpp +++ b/be/src/exec/hdfs_scanner.cpp @@ -21,6 +21,7 @@ #include "io/cache_select_input_stream.hpp" #include "io/compressed_input_stream.h" #include "io/shared_buffered_input_stream.h" +#include "storage/predicate_parser.h" #include "util/compression/compression_utils.h" #include "util/compression/stream_compression.h" @@ -87,7 +88,6 @@ Status HdfsScanner::init(RuntimeState* runtime_state, const HdfsScannerParams& s _scanner_params = scanner_params; RETURN_IF_ERROR(do_init(runtime_state, scanner_params)); - return Status::OK(); } @@ -168,6 +168,23 @@ Status HdfsScanner::_build_scanner_context() { ctx.split_context = _scanner_params.split_context; ctx.enable_split_tasks = _scanner_params.enable_split_tasks; ctx.connector_max_split_size = _scanner_params.connector_max_split_size; + + if (config::parquet_advance_zonemap_filter) { + OlapScanConjunctsManagerOptions opts; + opts.conjunct_ctxs_ptr = &_scanner_params.all_conjunct_ctxs; + opts.tuple_desc = _scanner_params.tuple_desc; + opts.obj_pool = _runtime_state->obj_pool(); + opts.runtime_filters = _scanner_params.runtime_filter_collector; + opts.runtime_state = _runtime_state; + opts.enable_column_expr_predicate = true; + opts.is_olap_scan = false; + opts.pred_tree_params.enable_or = true; + ctx.conjuncts_manager = std::make_unique(std::move(opts)); + RETURN_IF_ERROR(ctx.conjuncts_manager->parse_conjuncts()); + ConnectorPredicateParser predicate_parser{&ctx.slot_descs}; + ASSIGN_OR_RETURN(ctx.predicate_tree, + ctx.conjuncts_manager->get_predicate_tree(&predicate_parser, ctx.predicate_free_pool)); + } return Status::OK(); } diff --git a/be/src/exec/hdfs_scanner.h b/be/src/exec/hdfs_scanner.h index 35f8642fb7968..22c9c7a7a9fd3 100644 --- a/be/src/exec/hdfs_scanner.h +++ b/be/src/exec/hdfs_scanner.h @@ -17,6 +17,7 @@ #include #include +#include "exec/olap_scan_prepare.h" #include "exec/pipeline/scan/morsel.h" #include "exprs/expr.h" #include "exprs/expr_context.h" @@ -82,6 +83,8 @@ struct HdfsScanStats { // page index int64_t rows_before_page_index = 0; int64_t page_index_ns = 0; + int64_t parquet_total_row_groups = 0; + int64_t parquet_filtered_row_groups = 0; // late materialize round-by-round int64_t group_min_round_cost = 0; @@ -159,6 +162,7 @@ struct HdfsScannerParams { // runtime bloom filter. const RuntimeFilterProbeCollector* runtime_filter_collector = nullptr; + std::vector all_conjunct_ctxs; // all conjuncts except `conjunct_ctxs_by_slot`, like compound predicates std::vector scanner_conjunct_ctxs; std::unordered_set slots_in_conjunct; @@ -338,6 +342,12 @@ struct HdfsScannerContext { Status evaluate_on_conjunct_ctxs_by_slot(ChunkPtr* chunk, Filter* filter); void merge_split_tasks(); + + // used for parquet zone map filter only + std::unique_ptr conjuncts_manager = nullptr; + using PredicatePtr = std::unique_ptr; + std::vector predicate_free_pool; + PredicateTree predicate_tree; }; struct OpenFileOptions { @@ -398,6 +408,7 @@ class HdfsScanner { std::atomic _closed = false; Status _build_scanner_context(); void update_hdfs_counter(HdfsScanProfile* profile); + std::unique_ptr _conjuncts_manager; protected: HdfsScannerContext _scanner_ctx; diff --git a/be/src/exec/hdfs_scanner_parquet.cpp b/be/src/exec/hdfs_scanner_parquet.cpp index 72c77de733858..a20323f9693b4 100644 --- a/be/src/exec/hdfs_scanner_parquet.cpp +++ b/be/src/exec/hdfs_scanner_parquet.cpp @@ -88,6 +88,9 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) { // page index RuntimeProfile::Counter* rows_before_page_index = nullptr; RuntimeProfile::Counter* page_index_timer = nullptr; + // filter stats + RuntimeProfile::Counter* total_row_groups = nullptr; + RuntimeProfile::Counter* filtered_row_groups = nullptr; RuntimeProfile* root = profile->runtime_profile; ADD_COUNTER(root, kParquetProfileSectionPrefix, TUnit::NONE); @@ -128,6 +131,8 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) { kParquetProfileSectionPrefix); rows_before_page_index = ADD_CHILD_COUNTER(root, "RowsBeforePageIndex", TUnit::UNIT, kParquetProfileSectionPrefix); page_index_timer = ADD_CHILD_TIMER(root, "PageIndexTime", kParquetProfileSectionPrefix); + total_row_groups = ADD_CHILD_COUNTER(root, "TotalRowGroups", TUnit::UNIT, kParquetProfileSectionPrefix); + filtered_row_groups = ADD_CHILD_COUNTER(root, "FilteredRowGroups", TUnit::UNIT, kParquetProfileSectionPrefix); COUNTER_UPDATE(request_bytes_read, _app_stats.request_bytes_read); COUNTER_UPDATE(request_bytes_read_uncompressed, _app_stats.request_bytes_read_uncompressed); @@ -153,6 +158,11 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) { do_update_iceberg_v2_counter(root, kParquetProfileSectionPrefix); COUNTER_UPDATE(rows_before_page_index, _app_stats.rows_before_page_index); COUNTER_UPDATE(page_index_timer, _app_stats.page_index_ns); + COUNTER_UPDATE(total_row_groups, _app_stats.parquet_total_row_groups); + COUNTER_UPDATE(filtered_row_groups, _app_stats.parquet_filtered_row_groups); + if (_scanner_ctx.conjuncts_manager != nullptr) { + root->add_info_string("ParquetZoneMapFilter", _scanner_ctx.predicate_tree.root().debug_string()); + } } Status HdfsParquetScanner::do_open(RuntimeState* runtime_state) { diff --git a/be/src/exec/olap_scan_prepare.cpp b/be/src/exec/olap_scan_prepare.cpp index cf29a94c01e78..9031351ab52dc 100644 --- a/be/src/exec/olap_scan_prepare.cpp +++ b/be/src/exec/olap_scan_prepare.cpp @@ -257,7 +257,7 @@ StatusOr ChunkPredicateBuilder::parse_conjuncts() { RETURN_IF_ERROR(build_olap_filters()); // Only the root builder builds scan keys. - if (_is_root_builder) { + if (_is_root_builder && _opts.is_olap_scan) { RETURN_IF_ERROR(build_scan_keys(_opts.scan_keys_unlimited, _opts.max_scan_key_num)); } diff --git a/be/src/exec/olap_scan_prepare.h b/be/src/exec/olap_scan_prepare.h index d38d9b22a0c84..817c31b2eded3 100644 --- a/be/src/exec/olap_scan_prepare.h +++ b/be/src/exec/olap_scan_prepare.h @@ -46,6 +46,7 @@ struct OlapScanConjunctsManagerOptions { bool scan_keys_unlimited = true; int32_t max_scan_key_num = 1024; bool enable_column_expr_predicate = false; + bool is_olap_scan = true; PredicateTreeParams pred_tree_params; }; diff --git a/be/src/exec/pipeline/scan/olap_chunk_source.cpp b/be/src/exec/pipeline/scan/olap_chunk_source.cpp index 05452ae4cbd63..d71d275bb5aaf 100644 --- a/be/src/exec/pipeline/scan/olap_chunk_source.cpp +++ b/be/src/exec/pipeline/scan/olap_chunk_source.cpp @@ -228,7 +228,7 @@ Status OlapChunkSource::_init_reader_params(const std::vector& reader_columns) { const TOlapScanNode& thrift_olap_scan_node = _scan_node->thrift_olap_scan_node(); bool skip_aggregation = thrift_olap_scan_node.is_preaggregation; - auto parser = _obj_pool.add(new PredicateParser(_tablet_schema)); + auto parser = _obj_pool.add(new OlapPredicateParser(_tablet_schema)); _params.is_pipeline = true; _params.reader_type = READER_QUERY; _params.skip_aggregation = skip_aggregation; diff --git a/be/src/exec/tablet_scanner.cpp b/be/src/exec/tablet_scanner.cpp index e5adbb8cd7178..713dc18e616bc 100644 --- a/be/src/exec/tablet_scanner.cpp +++ b/be/src/exec/tablet_scanner.cpp @@ -144,7 +144,7 @@ Status TabletScanner::_init_reader_params(const std::vector* key // to avoid the unnecessary SerDe and improve query performance _params.need_agg_finalize = _need_agg_finalize; _params.use_page_cache = _runtime_state->use_page_cache(); - auto parser = _pool.add(new PredicateParser(_tablet_schema)); + auto parser = _pool.add(new OlapPredicateParser(_tablet_schema)); ASSIGN_OR_RETURN(auto pred_tree, _parent->_conjuncts_manager->get_predicate_tree(parser, _predicate_free_pool)); diff --git a/be/src/formats/CMakeLists.txt b/be/src/formats/CMakeLists.txt index 1d00ad5c20136..280d9e54fe7ac 100644 --- a/be/src/formats/CMakeLists.txt +++ b/be/src/formats/CMakeLists.txt @@ -78,6 +78,7 @@ add_library(Formats STATIC parquet/column_chunk_writer.cpp parquet/column_read_order_ctx.cpp parquet/statistics_helper.cpp + parquet/zone_map_filter_evaluator.h disk_range.hpp ) diff --git a/be/src/formats/parquet/column_reader.h b/be/src/formats/parquet/column_reader.h index 3434625922ede..0f99f44a29457 100644 --- a/be/src/formats/parquet/column_reader.h +++ b/be/src/formats/parquet/column_reader.h @@ -26,10 +26,12 @@ #include "common/object_pool.h" #include "common/status.h" #include "common/statusor.h" +#include "formats/parquet/metadata.h" #include "formats/parquet/types.h" #include "formats/parquet/utils.h" #include "io/shared_buffered_input_stream.h" #include "storage/column_predicate.h" +#include "storage/predicate_tree/predicate_tree_fwd.h" #include "storage/range.h" #include "types/logical_type.h" @@ -63,6 +65,7 @@ struct ColumnReaderOptions { RandomAccessFile* file = nullptr; const tparquet::RowGroup* row_group_meta = nullptr; uint64_t first_row_index = 0; + const FileMetaData* file_meta_data = nullptr; }; class StoredColumnReader; @@ -150,6 +153,14 @@ class ColumnReader { virtual void select_offset_index(const SparseRange& range, const uint64_t rg_first_row) = 0; + virtual Status row_group_zone_map_filter(const std::vector& predicates, + SparseRange* row_ranges, CompoundNodeType pred_relation, + const uint64_t rg_first_row, const uint64_t rg_num_rows) const { + // not implemented, means select the whole row group + row_ranges->add({rg_first_row, rg_first_row + rg_num_rows}); + return Status::OK(); + } + private: // _parquet_field is generated by parquet format, so ParquetField's children order may different from ColumnReader's children. const ParquetField* _parquet_field = nullptr; diff --git a/be/src/formats/parquet/file_reader.cpp b/be/src/formats/parquet/file_reader.cpp index ba55543e29558..e5b79330cbacc 100644 --- a/be/src/formats/parquet/file_reader.cpp +++ b/be/src/formats/parquet/file_reader.cpp @@ -49,6 +49,7 @@ #include "formats/parquet/schema.h" #include "formats/parquet/statistics_helper.h" #include "formats/parquet/utils.h" +#include "formats/parquet/zone_map_filter_evaluator.h" #include "fs/fs.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/parquet_types.h" @@ -324,19 +325,32 @@ bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_read // when doing row group filter, there maybe some error, but we'd better just ignore it instead of returning the error // status and lead to the query failed. bool FileReader::_filter_group(const GroupReaderPtr& group_reader) { - if (_filter_group_with_min_max_conjuncts(group_reader)) { - return true; - } + if (_scanner_ctx->conjuncts_manager != nullptr) { + auto res = _scanner_ctx->predicate_tree.visit( + ZoneMapEvaluator{_scanner_ctx->predicate_tree, group_reader}); + if (!res.ok()) { + LOG(WARNING) << "filter row group failed: " << res.status().message(); + return false; + } + if (res.value().has_value() && res.value()->empty()) { + return true; + } + return false; + } else { + if (_filter_group_with_min_max_conjuncts(group_reader)) { + return true; + } - if (_filter_group_with_bloom_filter_min_max_conjuncts(group_reader)) { - return true; - } + if (_filter_group_with_bloom_filter_min_max_conjuncts(group_reader)) { + return true; + } - if (config::parquet_statistics_process_more_filter_enable && _filter_group_with_more_filter(group_reader)) { - return true; - } + if (config::parquet_statistics_process_more_filter_enable && _filter_group_with_more_filter(group_reader)) { + return true; + } - return false; + return false; + } } Status FileReader::_read_min_max_chunk(const GroupReaderPtr& group_reader, const std::vector& slots, @@ -483,9 +497,12 @@ Status FileReader::_init_group_readers() { std::make_shared(_group_reader_param, i, _need_skip_rowids, row_group_first_row); RETURN_IF_ERROR(row_group_reader->init()); + _group_reader_param.stats->parquet_total_row_groups += 1; + // You should call row_group_reader->init() before _filter_group() if (_filter_group(row_group_reader)) { DLOG(INFO) << "row group " << i << " of file has been filtered by min/max conjunct"; + _group_reader_param.stats->parquet_filtered_row_groups += 1; continue; } diff --git a/be/src/formats/parquet/file_reader.h b/be/src/formats/parquet/file_reader.h index 077dc02ac89c7..5f5f12de54a7c 100644 --- a/be/src/formats/parquet/file_reader.h +++ b/be/src/formats/parquet/file_reader.h @@ -111,6 +111,8 @@ class FileReader { Status _read_min_max_chunk(const GroupReaderPtr& group_reader, const std::vector& slots, ChunkPtr* min_chunk, ChunkPtr* max_chunk) const; + // StatusOr> _read_zone_map_from_row_group(const tparquet::RowGroup& row_group, const std::vector& slot); + // only scan partition column + not exist column Status _exec_no_materialized_column_scan(ChunkPtr* chunk); diff --git a/be/src/formats/parquet/group_reader.cpp b/be/src/formats/parquet/group_reader.cpp index b74be7a14c35c..ed8c541c59a84 100644 --- a/be/src/formats/parquet/group_reader.cpp +++ b/be/src/formats/parquet/group_reader.cpp @@ -133,6 +133,14 @@ const tparquet::ColumnChunk* GroupReader::get_chunk_metadata(SlotId slot_id) { return it->second->get_chunk_metadata(); } +const ColumnReader* GroupReader::get_column_reader(SlotId slot_id) { + const auto& it = _column_readers.find(slot_id); + if (it == _column_readers.end()) { + return nullptr; + } + return it->second.get(); +} + const ParquetField* GroupReader::get_column_parquet_field(SlotId slot_id) { const auto& it = _column_readers.find(slot_id); if (it == _column_readers.end()) { @@ -304,6 +312,7 @@ Status GroupReader::_create_column_readers() { SCOPED_RAW_TIMER(&_param.stats->column_reader_init_ns); // ColumnReaderOptions is used by all column readers in one row group ColumnReaderOptions& opts = _column_reader_opts; + opts.file_meta_data = _param.file_metadata; opts.timezone = _param.timezone; opts.case_sensitive = _param.case_sensitive; opts.chunk_size = _param.chunk_size; diff --git a/be/src/formats/parquet/group_reader.h b/be/src/formats/parquet/group_reader.h index 76a04faddcef6..ca84a7a9998dd 100644 --- a/be/src/formats/parquet/group_reader.h +++ b/be/src/formats/parquet/group_reader.h @@ -112,6 +112,8 @@ class GroupReader { Status prepare(); const tparquet::ColumnChunk* get_chunk_metadata(SlotId slot_id); const ParquetField* get_column_parquet_field(SlotId slot_id); + const ColumnReader* get_column_reader(SlotId slot_id); + uint64_t get_row_group_first_row() const { return _row_group_first_row; } const tparquet::RowGroup* get_row_group_metadata() const; Status get_next(ChunkPtr* chunk, size_t* row_count); void collect_io_ranges(std::vector* ranges, int64_t* end_offset, diff --git a/be/src/formats/parquet/scalar_column_reader.cpp b/be/src/formats/parquet/scalar_column_reader.cpp index b8d9e4ccff654..9cec9d0e59895 100644 --- a/be/src/formats/parquet/scalar_column_reader.cpp +++ b/be/src/formats/parquet/scalar_column_reader.cpp @@ -18,6 +18,7 @@ #include "gutil/casts.h" #include "io/shared_buffered_input_stream.h" #include "simd/simd.h" +#include "statistics_helper.h" #include "utils.h" namespace starrocks::parquet { @@ -254,4 +255,70 @@ void ScalarColumnReader::select_offset_index(const SparseRange& range, _reader = std::make_unique(std::move(_reader), _offset_index_ctx.get(), has_dict_page); } +Status ScalarColumnReader::row_group_zone_map_filter(const std::vector& predicates, + SparseRange* row_ranges, CompoundNodeType pred_relation, + const uint64_t rg_first_row, const uint64_t rg_num_rows) const { + const uint64_t rg_end_row = rg_first_row + rg_num_rows; + + if (!get_chunk_metadata()->meta_data.__isset.statistics || get_column_parquet_field() == nullptr) { + // statistics is not existed, select all + row_ranges->add({rg_first_row, rg_end_row}); + return Status::OK(); + } + + bool has_null = true; + bool is_all_null = false; + + if (get_chunk_metadata()->meta_data.statistics.__isset.null_count) { + has_null = get_chunk_metadata()->meta_data.statistics.null_count > 0; + is_all_null = get_chunk_metadata()->meta_data.statistics.null_count == rg_num_rows; + } + + std::optional zone_map_detail = std::nullopt; + if (is_all_null) { + // if the entire column's value is null, the min/max value not existed + zone_map_detail = ZoneMapDetail{Datum{}, Datum{}, true}; + zone_map_detail->set_num_rows(rg_num_rows); + } else { + std::vector min_values; + std::vector max_values; + Status st = + StatisticsHelper::get_min_max_value(_opts.file_meta_data, *_col_type, &get_chunk_metadata()->meta_data, + get_column_parquet_field(), min_values, max_values); + if (st.ok()) { + const ColumnPtr min_column = ColumnHelper::create_column(*_col_type, true); + const ColumnPtr max_column = ColumnHelper::create_column(*_col_type, true); + + RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(min_column, min_values, *_col_type, + get_column_parquet_field(), _opts.timezone)); + RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(max_column, max_values, *_col_type, + get_column_parquet_field(), _opts.timezone)); + + zone_map_detail = ZoneMapDetail{min_column->get(0), max_column->get(0), has_null}; + zone_map_detail->set_num_rows(rg_num_rows); + } + } + + if (!zone_map_detail.has_value()) { + // ZoneMapDetail not set, means select all + row_ranges->add({rg_first_row, rg_end_row}); + return Status::OK(); + } + + auto is_satisfy = [&](const ZoneMapDetail& detail) { + if (pred_relation == CompoundNodeType::AND) { + return std::ranges::all_of(predicates, [&](const auto* pred) { return pred->zone_map_filter(detail); }); + } else { + return predicates.empty() || + std::ranges::any_of(predicates, [&](const auto* pred) { return pred->zone_map_filter(detail); }); + } + }; + + if (is_satisfy(zone_map_detail.value())) { + row_ranges->add({rg_first_row, rg_end_row}); + } + + return Status::OK(); +} + } // namespace starrocks::parquet \ No newline at end of file diff --git a/be/src/formats/parquet/scalar_column_reader.h b/be/src/formats/parquet/scalar_column_reader.h index 7b16b81988e6b..9dfb8d877ce43 100644 --- a/be/src/formats/parquet/scalar_column_reader.h +++ b/be/src/formats/parquet/scalar_column_reader.h @@ -88,6 +88,10 @@ class ScalarColumnReader final : public ColumnReader { void select_offset_index(const SparseRange& range, const uint64_t rg_first_row) override; + Status row_group_zone_map_filter(const std::vector& predicates, + SparseRange* row_ranges, CompoundNodeType pred_relation, + const uint64_t rg_first_row, const uint64_t rg_num_rows) const override; + private: // Returns true if all of the data pages in the column chunk are dict encoded bool _column_all_pages_dict_encoded(); diff --git a/be/src/formats/parquet/statistics_helper.cpp b/be/src/formats/parquet/statistics_helper.cpp index d411f68b35cf6..d0c7bbee67a45 100644 --- a/be/src/formats/parquet/statistics_helper.cpp +++ b/be/src/formats/parquet/statistics_helper.cpp @@ -274,4 +274,39 @@ Status StatisticsHelper::in_filter_on_min_max_stat(const std::vector& min_values, std::vector& max_values) { + // When statistics is empty, column_meta->__isset.statistics is still true, + // but statistics.__isset.xxx may be false, so judgment is required here. + bool is_set_min_max = (column_meta->statistics.__isset.max && column_meta->statistics.__isset.min) || + (column_meta->statistics.__isset.max_value && column_meta->statistics.__isset.min_value); + if (!is_set_min_max) { + return Status::Aborted("No exist min/max"); + } + + DCHECK_EQ(field->physical_type, column_meta->type); + auto sort_order = sort_order_of_logical_type(type.type); + + if (!has_correct_min_max_stats(file_metadata, *column_meta, sort_order)) { + return Status::Aborted("The file has incorrect order"); + } + + if (column_meta->statistics.__isset.min_value) { + min_values.emplace_back(column_meta->statistics.min_value); + max_values.emplace_back(column_meta->statistics.max_value); + } else { + min_values.emplace_back(column_meta->statistics.min); + max_values.emplace_back(column_meta->statistics.max); + } + + return Status::OK(); +} + +bool StatisticsHelper::has_correct_min_max_stats(const FileMetaData* file_metadata, + const tparquet::ColumnMetaData& column_meta, + const SortOrder& sort_order) { + return file_metadata->writer_version().HasCorrectStatistics(column_meta, sort_order); +} + } // namespace starrocks::parquet \ No newline at end of file diff --git a/be/src/formats/parquet/statistics_helper.h b/be/src/formats/parquet/statistics_helper.h index 4f8dd20417206..5893fdd11aa9b 100644 --- a/be/src/formats/parquet/statistics_helper.h +++ b/be/src/formats/parquet/statistics_helper.h @@ -16,8 +16,8 @@ #include "column/vectorized_fwd.h" #include "common/status.h" -#include "exprs/expr_context.h" #include "exprs/in_const_predicate.hpp" +#include "formats/parquet/metadata.h" #include "formats/parquet/schema.h" #include "runtime/types.h" @@ -36,6 +36,14 @@ class StatisticsHelper { static Status in_filter_on_min_max_stat(const std::vector& min_values, const std::vector& max_values, ExprContext* ctx, const ParquetField* field, const std::string& timezone, Filter& selected); + + // get min/max value from row group stats + static Status get_min_max_value(const FileMetaData* file_meta_data, const TypeDescriptor& type, + const tparquet::ColumnMetaData* column_meta, const ParquetField* field, + std::vector& min_values, std::vector& max_values); + + static bool has_correct_min_max_stats(const FileMetaData* file_metadata, + const tparquet::ColumnMetaData& column_meta, const SortOrder& sort_order); }; } // namespace starrocks::parquet \ No newline at end of file diff --git a/be/src/formats/parquet/zone_map_filter_evaluator.h b/be/src/formats/parquet/zone_map_filter_evaluator.h new file mode 100644 index 0000000000000..8bf73fd009c2d --- /dev/null +++ b/be/src/formats/parquet/zone_map_filter_evaluator.h @@ -0,0 +1,80 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "formats/parquet/group_reader.h" +#include "storage/predicate_tree/predicate_tree.h" + +namespace starrocks::parquet { + +enum class FilterLevel { ROW_GROUP = 0, PAGE_INDEX }; + +template +struct ZoneMapEvaluator { + template + StatusOr>> operator()(const PredicateCompoundNode& node) { + std::optional> row_ranges = std::nullopt; + + const auto& ctx = pred_tree.compound_node_context(node.id()); + const auto& cid_to_col_preds = ctx.cid_to_col_preds(node); + + for (const auto& [cid, col_preds] : cid_to_col_preds) { + const auto* column_reader = group_reader->get_column_reader(cid); + if (column_reader == nullptr) { + // TODO: For partition column, it's column reader is not existed + // So we didn't support partition column yet + // Eg. WHERE a = 1 OR dt = '2012' + continue; + } + SparseRange cur_row_ranges; + if (level == FilterLevel::ROW_GROUP) { + // TODO nullptr check + RETURN_IF_ERROR(group_reader->get_column_reader(cid)->row_group_zone_map_filter( + col_preds, &cur_row_ranges, Type, group_reader->get_row_group_first_row(), + group_reader->get_row_group_metadata()->num_rows)); + } else { + return Status::InternalError("not supported yet"); + } + + merge_row_ranges(row_ranges, cur_row_ranges); + } + + for (const auto& child : node.compound_children()) { + ASSIGN_OR_RETURN(auto cur_row_ranges_opt, child.visit(*this)); + if (cur_row_ranges_opt.has_value()) { + merge_row_ranges(row_ranges, cur_row_ranges_opt.value()); + } + } + return row_ranges; + } + + template + static void merge_row_ranges(std::optional>& dest, SparseRange& source) { + if (!dest.has_value()) { + dest = std::move(source); + } else { + if constexpr (Type == CompoundNodeType::AND) { + dest.value() &= source; + } else { + dest.value() |= source; + } + } + } + + const PredicateTree& pred_tree; + const GroupReaderPtr& group_reader; +}; + +} // namespace starrocks::parquet \ No newline at end of file diff --git a/be/src/storage/lake/tablet_reader.cpp b/be/src/storage/lake/tablet_reader.cpp index d85357da80f66..74eae042423a3 100644 --- a/be/src/storage/lake/tablet_reader.cpp +++ b/be/src/storage/lake/tablet_reader.cpp @@ -393,7 +393,7 @@ Status TabletReader::init_delete_predicates(const TabletReaderParams& params, De if (UNLIKELY(_tablet_schema == nullptr)) { return Status::InternalError("tablet schema is null. forget or fail to call prepare()"); } - PredicateParser pred_parser(_tablet_schema); + OlapPredicateParser pred_parser(_tablet_schema); for (int index = 0, size = _tablet_metadata->rowsets_size(); index < size; ++index) { const auto& rowset_metadata = _tablet_metadata->rowsets(index); diff --git a/be/src/storage/predicate_parser.cpp b/be/src/storage/predicate_parser.cpp index 25c3c662c2232..9daf982dbaa2b 100644 --- a/be/src/storage/predicate_parser.cpp +++ b/be/src/storage/predicate_parser.cpp @@ -27,14 +27,47 @@ namespace starrocks { -bool PredicateParser::can_pushdown(const ColumnPredicate* predicate) const { +ColumnPredicate* PredicateParser::create_column_predicate(const TCondition& condition, TypeInfoPtr& type_info, + ColumnId index) { + ColumnPredicate* pred = nullptr; + if ((condition.condition_op == "*=" || condition.condition_op == "=") && condition.condition_values.size() == 1) { + pred = new_column_eq_predicate(type_info, index, condition.condition_values[0]); + } else if ((condition.condition_op == "!*=" || condition.condition_op == "!=") && + condition.condition_values.size() == 1) { + pred = new_column_ne_predicate(type_info, index, condition.condition_values[0]); + } else if (condition.condition_op == "<<") { + pred = new_column_lt_predicate(type_info, index, condition.condition_values[0]); + } else if (condition.condition_op == "<=") { + pred = new_column_le_predicate(type_info, index, condition.condition_values[0]); + } else if (condition.condition_op == ">>") { + pred = new_column_gt_predicate(type_info, index, condition.condition_values[0]); + } else if (condition.condition_op == ">=") { + pred = new_column_ge_predicate(type_info, index, condition.condition_values[0]); + } else if (condition.condition_op == "*=" && condition.condition_values.size() > 1) { + pred = new_column_in_predicate(type_info, index, condition.condition_values); + } else if (condition.condition_op == "!*=" && condition.condition_values.size() > 1) { + pred = new_column_not_in_predicate(type_info, index, condition.condition_values); + } else if (condition.condition_op == "!=" && condition.condition_values.size() > 1) { + pred = new_column_not_in_predicate(type_info, index, condition.condition_values); + } else if ((condition.condition_op.size() == 4 && strcasecmp(condition.condition_op.c_str(), " is ") == 0) || + (condition.condition_op.size() == 2 && strcasecmp(condition.condition_op.c_str(), "is") == 0)) { + bool is_null = strcasecmp(condition.condition_values[0].c_str(), "null") == 0; + pred = new_column_null_predicate(type_info, index, is_null); + } else { + LOG(WARNING) << "unknown condition: " << condition.condition_op; + return pred; + } + return pred; +} + +bool OlapPredicateParser::can_pushdown(const ColumnPredicate* predicate) const { RETURN_IF(predicate->column_id() >= _schema->num_columns(), false); const TabletColumn& column = _schema->column(predicate->column_id()); return _schema->keys_type() == KeysType::PRIMARY_KEYS || column.aggregation() == StorageAggregateType::STORAGE_AGGREGATE_NONE; } -bool PredicateParser::can_pushdown(const SlotDescriptor* slot_desc) const { +bool OlapPredicateParser::can_pushdown(const SlotDescriptor* slot_desc) const { const size_t index = _schema->field_index(slot_desc->col_name()); CHECK(index <= _schema->num_columns()); const TabletColumn& column = _schema->column(index); @@ -54,11 +87,11 @@ struct CanPushDownVisitor { const PredicateParser* parent; }; -bool PredicateParser::can_pushdown(const ConstPredicateNodePtr& pred_tree) const { +bool OlapPredicateParser::can_pushdown(const ConstPredicateNodePtr& pred_tree) const { return pred_tree.visit(CanPushDownVisitor{this}); } -ColumnPredicate* PredicateParser::parse_thrift_cond(const TCondition& condition) const { +ColumnPredicate* OlapPredicateParser::parse_thrift_cond(const TCondition& condition) const { const size_t index = _schema->field_index(condition.column_name); RETURN_IF(index >= _schema->num_columns(), nullptr); const TabletColumn& col = _schema->column(index); @@ -67,34 +100,8 @@ ColumnPredicate* PredicateParser::parse_thrift_cond(const TCondition& condition) auto type = col.type(); auto&& type_info = get_type_info(type, precision, scale); - ColumnPredicate* pred = nullptr; - if ((condition.condition_op == "*=" || condition.condition_op == "=") && condition.condition_values.size() == 1) { - pred = new_column_eq_predicate(type_info, index, condition.condition_values[0]); - } else if ((condition.condition_op == "!*=" || condition.condition_op == "!=") && - condition.condition_values.size() == 1) { - pred = new_column_ne_predicate(type_info, index, condition.condition_values[0]); - } else if (condition.condition_op == "<<") { - pred = new_column_lt_predicate(type_info, index, condition.condition_values[0]); - } else if (condition.condition_op == "<=") { - pred = new_column_le_predicate(type_info, index, condition.condition_values[0]); - } else if (condition.condition_op == ">>") { - pred = new_column_gt_predicate(type_info, index, condition.condition_values[0]); - } else if (condition.condition_op == ">=") { - pred = new_column_ge_predicate(type_info, index, condition.condition_values[0]); - } else if (condition.condition_op == "*=" && condition.condition_values.size() > 1) { - pred = new_column_in_predicate(type_info, index, condition.condition_values); - } else if (condition.condition_op == "!*=" && condition.condition_values.size() > 1) { - pred = new_column_not_in_predicate(type_info, index, condition.condition_values); - } else if (condition.condition_op == "!=" && condition.condition_values.size() > 1) { - pred = new_column_not_in_predicate(type_info, index, condition.condition_values); - } else if ((condition.condition_op.size() == 4 && strcasecmp(condition.condition_op.c_str(), " is ") == 0) || - (condition.condition_op.size() == 2 && strcasecmp(condition.condition_op.c_str(), "is") == 0)) { - bool is_null = strcasecmp(condition.condition_values[0].c_str(), "null") == 0; - pred = new_column_null_predicate(type_info, index, is_null); - } else { - LOG(WARNING) << "unknown condition: " << condition.condition_op; - return pred; - } + ColumnPredicate* pred = create_column_predicate(condition, type_info, index); + RETURN_IF(pred == nullptr, nullptr); if (type == TYPE_CHAR) { pred->padding_zeros(col.length()); @@ -102,8 +109,8 @@ ColumnPredicate* PredicateParser::parse_thrift_cond(const TCondition& condition) return pred; } -StatusOr PredicateParser::parse_expr_ctx(const SlotDescriptor& slot_desc, RuntimeState* state, - ExprContext* expr_ctx) const { +StatusOr OlapPredicateParser::parse_expr_ctx(const SlotDescriptor& slot_desc, RuntimeState* state, + ExprContext* expr_ctx) const { const size_t column_id = _schema->field_index(slot_desc.col_name()); RETURN_IF(column_id >= _schema->num_columns(), nullptr); const TabletColumn& col = _schema->column(column_id); @@ -114,8 +121,69 @@ StatusOr PredicateParser::parse_expr_ctx(const SlotDescriptor& return ColumnExprPredicate::make_column_expr_predicate(type_info, column_id, state, expr_ctx, &slot_desc); } -uint32_t PredicateParser::column_id(const SlotDescriptor& slot_desc) { +uint32_t OlapPredicateParser::column_id(const SlotDescriptor& slot_desc) const { return _schema->field_index(slot_desc.col_name()); } +bool ConnectorPredicateParser::can_pushdown(const ColumnPredicate* predicate) const { + return false; +} + +bool ConnectorPredicateParser::can_pushdown(const SlotDescriptor* slot_desc) const { + return false; +} + +bool ConnectorPredicateParser::can_pushdown(const ConstPredicateNodePtr& pred_tree) const { + return false; +} + +ColumnPredicate* ConnectorPredicateParser::parse_thrift_cond(const TCondition& condition) const { + uint8_t precision = 0; + uint8_t scale = 0; + LogicalType type = LogicalType::TYPE_UNKNOWN; + size_t index = 0; + + for (const SlotDescriptor* slot : *_slot_desc) { + if (slot->col_name() == condition.column_name) { + type = slot->type().type; + precision = slot->type().precision; + scale = slot->type().scale; + index = slot->id(); + break; + } + } + // column not found + RETURN_IF(type == LogicalType::TYPE_UNKNOWN, nullptr); + + auto&& type_info = get_type_info(type, precision, scale); + + return create_column_predicate(condition, type_info, index); +} + +StatusOr ConnectorPredicateParser::parse_expr_ctx(const SlotDescriptor& slot_desc, + RuntimeState* state, ExprContext* expr_ctx) const { + uint8_t precision = 0; + uint8_t scale = 0; + LogicalType type = LogicalType::TYPE_UNKNOWN; + size_t column_id = 0; + + for (const SlotDescriptor* slot : *_slot_desc) { + if (slot->col_name() == slot_desc.col_name()) { + type = slot->type().type; + precision = slot->type().precision; + scale = slot->type().scale; + column_id = slot->id(); + break; + } + } + RETURN_IF(type == LogicalType::TYPE_UNKNOWN, nullptr); + + auto&& type_info = get_type_info(type, precision, scale); + return ColumnExprPredicate::make_column_expr_predicate(type_info, column_id, state, expr_ctx, &slot_desc); +} + +uint32_t ConnectorPredicateParser::column_id(const SlotDescriptor& slot_desc) const { + return slot_desc.id(); +} + } // namespace starrocks diff --git a/be/src/storage/predicate_parser.h b/be/src/storage/predicate_parser.h index b7c9f3c6c3ada..d5d20f3d6970f 100644 --- a/be/src/storage/predicate_parser.h +++ b/be/src/storage/predicate_parser.h @@ -33,26 +33,74 @@ class ColumnPredicate; class PredicateParser { public: - explicit PredicateParser(TabletSchemaCSPtr schema) : _schema(std::move(schema)) {} + virtual ~PredicateParser() = default; + // check if an expression can be pushed down to the storage level + virtual bool can_pushdown(const ColumnPredicate* predicate) const = 0; + + virtual bool can_pushdown(const ConstPredicateNodePtr& pred_tree) const = 0; + + virtual bool can_pushdown(const SlotDescriptor* slot_desc) const = 0; + + // Parse |condition| into a predicate that can be pushed down. + // return nullptr if parse failed. + virtual ColumnPredicate* parse_thrift_cond(const TCondition& condition) const = 0; + + virtual StatusOr parse_expr_ctx(const SlotDescriptor& slot_desc, RuntimeState*, + ExprContext* expr_ctx) const = 0; + + virtual uint32_t column_id(const SlotDescriptor& slot_desc) const = 0; + +protected: + static ColumnPredicate* create_column_predicate(const TCondition& condition, TypeInfoPtr& type_info, + ColumnId index); +}; + +class OlapPredicateParser final : public PredicateParser { +public: + explicit OlapPredicateParser(TabletSchemaCSPtr schema) : _schema(std::move(schema)) {} + // explicit PredicateParser(const std::vector* slot_descriptors) : _slot_desc(slot_descriptors) {} // check if an expression can be pushed down to the storage level - bool can_pushdown(const ColumnPredicate* predicate) const; + bool can_pushdown(const ColumnPredicate* predicate) const override; - bool can_pushdown(const ConstPredicateNodePtr& pred_tree) const; + bool can_pushdown(const ConstPredicateNodePtr& pred_tree) const override; - bool can_pushdown(const SlotDescriptor* slot_desc) const; + bool can_pushdown(const SlotDescriptor* slot_desc) const override; // Parse |condition| into a predicate that can be pushed down. // return nullptr if parse failed. - ColumnPredicate* parse_thrift_cond(const TCondition& condition) const; + ColumnPredicate* parse_thrift_cond(const TCondition& condition) const override; + + StatusOr parse_expr_ctx(const SlotDescriptor& slot_desc, RuntimeState*, + ExprContext* expr_ctx) const override; + + uint32_t column_id(const SlotDescriptor& slot_desc) const override; + +private: + const TabletSchemaCSPtr _schema = nullptr; + // const std::vector* _slot_desc = nullptr; +}; + +class ConnectorPredicateParser final : public PredicateParser { +public: + explicit ConnectorPredicateParser(const std::vector* slot_descriptors) + : _slot_desc(slot_descriptors) {} + + bool can_pushdown(const ColumnPredicate* predicate) const override; + + bool can_pushdown(const ConstPredicateNodePtr& pred_tree) const override; + + bool can_pushdown(const SlotDescriptor* slot_desc) const override; + + ColumnPredicate* parse_thrift_cond(const TCondition& condition) const override; StatusOr parse_expr_ctx(const SlotDescriptor& slot_desc, RuntimeState*, - ExprContext* expr_ctx) const; + ExprContext* expr_ctx) const override; - uint32_t column_id(const SlotDescriptor& slot_desc); + uint32_t column_id(const SlotDescriptor& slot_desc) const override; private: - const TabletSchemaCSPtr _schema; + const std::vector* _slot_desc = nullptr; }; } // namespace starrocks diff --git a/be/src/storage/tablet_reader.cpp b/be/src/storage/tablet_reader.cpp index 29d9c6054bd36..918c706c1e7c5 100644 --- a/be/src/storage/tablet_reader.cpp +++ b/be/src/storage/tablet_reader.cpp @@ -564,7 +564,7 @@ Status TabletReader::_init_predicates(const TabletReaderParams& params) { } Status TabletReader::_init_delete_predicates(const TabletReaderParams& params, DeletePredicates* dels) { - PredicateParser pred_parser(_tablet_schema); + OlapPredicateParser pred_parser(_tablet_schema); std::shared_lock header_lock(_tablet->get_header_lock()); for (const DeletePredicatePB& pred_pb : _tablet->delete_predicates()) { From df861f6c4acf19548338acc5166e3e10d4088c2e Mon Sep 17 00:00:00 2001 From: Smith Cruise Date: Fri, 15 Nov 2024 10:46:40 +0800 Subject: [PATCH 2/7] improve Signed-off-by: Smith Cruise --- be/src/exec/hdfs_scanner_parquet.cpp | 6 ++++-- be/src/formats/parquet/scalar_column_reader.cpp | 7 ++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/be/src/exec/hdfs_scanner_parquet.cpp b/be/src/exec/hdfs_scanner_parquet.cpp index a20323f9693b4..b45fee41fd1eb 100644 --- a/be/src/exec/hdfs_scanner_parquet.cpp +++ b/be/src/exec/hdfs_scanner_parquet.cpp @@ -18,6 +18,7 @@ #include "exec/iceberg/iceberg_delete_builder.h" #include "exec/paimon/paimon_delete_file_builder.h" #include "formats/parquet/file_reader.h" +#include "pipeline/fragment_context.h" #include "util/runtime_profile.h" namespace starrocks { @@ -160,8 +161,9 @@ void HdfsParquetScanner::do_update_counter(HdfsScanProfile* profile) { COUNTER_UPDATE(page_index_timer, _app_stats.page_index_ns); COUNTER_UPDATE(total_row_groups, _app_stats.parquet_total_row_groups); COUNTER_UPDATE(filtered_row_groups, _app_stats.parquet_filtered_row_groups); - if (_scanner_ctx.conjuncts_manager != nullptr) { - root->add_info_string("ParquetZoneMapFilter", _scanner_ctx.predicate_tree.root().debug_string()); + if (_scanner_ctx.conjuncts_manager != nullptr && + _runtime_state->fragment_ctx()->pred_tree_params().enable_show_in_profile) { + root->add_info_string("ParquetPredicateTreeFilter", _scanner_ctx.predicate_tree.root().debug_string()); } } diff --git a/be/src/formats/parquet/scalar_column_reader.cpp b/be/src/formats/parquet/scalar_column_reader.cpp index 9cec9d0e59895..35eb43c8edc40 100644 --- a/be/src/formats/parquet/scalar_column_reader.cpp +++ b/be/src/formats/parquet/scalar_column_reader.cpp @@ -275,6 +275,10 @@ Status ScalarColumnReader::row_group_zone_map_filter(const std::vector zone_map_detail = std::nullopt; + + // used to hold min/max slice values + const ColumnPtr min_column = ColumnHelper::create_column(*_col_type, true); + const ColumnPtr max_column = ColumnHelper::create_column(*_col_type, true); if (is_all_null) { // if the entire column's value is null, the min/max value not existed zone_map_detail = ZoneMapDetail{Datum{}, Datum{}, true}; @@ -286,9 +290,6 @@ Status ScalarColumnReader::row_group_zone_map_filter(const std::vectormeta_data, get_column_parquet_field(), min_values, max_values); if (st.ok()) { - const ColumnPtr min_column = ColumnHelper::create_column(*_col_type, true); - const ColumnPtr max_column = ColumnHelper::create_column(*_col_type, true); - RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(min_column, min_values, *_col_type, get_column_parquet_field(), _opts.timezone)); RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(max_column, max_values, *_col_type, From 6a901dd7073a04764660e30a1084d5381edc2b6f Mon Sep 17 00:00:00 2001 From: Smith Cruise Date: Fri, 15 Nov 2024 11:03:15 +0800 Subject: [PATCH 3/7] improve Signed-off-by: Smith Cruise --- be/src/formats/parquet/file_reader.cpp | 40 +++----------------------- be/src/formats/parquet/file_reader.h | 7 ----- 2 files changed, 4 insertions(+), 43 deletions(-) diff --git a/be/src/formats/parquet/file_reader.cpp b/be/src/formats/parquet/file_reader.cpp index e5b79330cbacc..b02f7672a2a18 100644 --- a/be/src/formats/parquet/file_reader.cpp +++ b/be/src/formats/parquet/file_reader.cpp @@ -306,7 +306,8 @@ bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_read LOG(WARNING) << "Can't get " + slot->col_name() + "'s ParquetField in _read_min_max_chunk."; continue; } - auto st = _get_min_max_value(slot, column_meta, field, min_values, max_values); + auto st = StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta, + field, min_values, max_values); if (!st.ok()) continue; Filter selected(min_values.size(), 1); st = StatisticsHelper::in_filter_on_min_max_stat(min_values, max_values, ctx, field, @@ -395,7 +396,8 @@ Status FileReader::_read_min_max_chunk(const GroupReaderPtr& group_reader, const return Status::InternalError(strings::Substitute("Can't get $0 field", slot->col_name())); } - RETURN_IF_ERROR(_get_min_max_value(slot, column_meta, field, min_values, max_values)); + RETURN_IF_ERROR(StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta, field, + min_values, max_values)); RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column((*min_chunk)->columns()[i], min_values, slot->type(), field, ctx.timezone)); RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column((*max_chunk)->columns()[i], max_values, @@ -415,40 +417,6 @@ int32_t FileReader::_get_partition_column_idx(const std::string& col_name) const return -1; } -Status FileReader::_get_min_max_value(const SlotDescriptor* slot, const tparquet::ColumnMetaData* column_meta, - const ParquetField* field, std::vector& min_values, - std::vector& max_values) const { - // When statistics is empty, column_meta->__isset.statistics is still true, - // but statistics.__isset.xxx may be false, so judgment is required here. - bool is_set_min_max = (column_meta->statistics.__isset.max && column_meta->statistics.__isset.min) || - (column_meta->statistics.__isset.max_value && column_meta->statistics.__isset.min_value); - if (!is_set_min_max) { - return Status::Aborted("No exist min/max"); - } - - DCHECK_EQ(field->physical_type, column_meta->type); - auto sort_order = sort_order_of_logical_type(slot->type().type); - - if (!_has_correct_min_max_stats(*column_meta, sort_order)) { - return Status::Aborted("The file has incorrect order"); - } - - if (column_meta->statistics.__isset.min_value) { - min_values.emplace_back(column_meta->statistics.min_value); - max_values.emplace_back(column_meta->statistics.max_value); - } else { - min_values.emplace_back(column_meta->statistics.min); - max_values.emplace_back(column_meta->statistics.max); - } - - return Status::OK(); -} - -bool FileReader::_has_correct_min_max_stats(const tparquet::ColumnMetaData& column_meta, - const SortOrder& sort_order) const { - return _file_metadata->writer_version().HasCorrectStatistics(column_meta, sort_order); -} - void FileReader::_prepare_read_columns(std::unordered_set& existed_column_names) { _meta_helper->prepare_read_columns(_scanner_ctx->materialized_columns, _group_reader_param.read_cols, existed_column_names); diff --git a/be/src/formats/parquet/file_reader.h b/be/src/formats/parquet/file_reader.h index 5f5f12de54a7c..8901047ab1606 100644 --- a/be/src/formats/parquet/file_reader.h +++ b/be/src/formats/parquet/file_reader.h @@ -125,13 +125,6 @@ class FileReader { // Validate the magic bytes and get the length of metadata StatusOr _parse_metadata_length(const std::vector& footer_buff) const; - // get min/max value from row group stats - Status _get_min_max_value(const SlotDescriptor* slot, const tparquet::ColumnMetaData* column_meta, - const ParquetField* field, std::vector& min_values, - std::vector& max_values) const; - - bool _has_correct_min_max_stats(const tparquet::ColumnMetaData& column_meta, const SortOrder& sort_order) const; - Status _build_split_tasks(); RandomAccessFile* _file = nullptr; From 901ffec7fa516371580e4f46214506e6b10228ee Mon Sep 17 00:00:00 2001 From: Smith Cruise Date: Fri, 15 Nov 2024 15:02:12 +0800 Subject: [PATCH 4/7] update Signed-off-by: Smith Cruise --- be/src/exec/hdfs_scanner.cpp | 1 + be/src/exec/hdfs_scanner.h | 4 +--- be/test/storage/conjunctive_predicates_test.cpp | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/be/src/exec/hdfs_scanner.cpp b/be/src/exec/hdfs_scanner.cpp index 0833634417c0d..d4c47f76e9b20 100644 --- a/be/src/exec/hdfs_scanner.cpp +++ b/be/src/exec/hdfs_scanner.cpp @@ -88,6 +88,7 @@ Status HdfsScanner::init(RuntimeState* runtime_state, const HdfsScannerParams& s _scanner_params = scanner_params; RETURN_IF_ERROR(do_init(runtime_state, scanner_params)); + return Status::OK(); } diff --git a/be/src/exec/hdfs_scanner.h b/be/src/exec/hdfs_scanner.h index 22c9c7a7a9fd3..f4359d428a8e1 100644 --- a/be/src/exec/hdfs_scanner.h +++ b/be/src/exec/hdfs_scanner.h @@ -345,8 +345,7 @@ struct HdfsScannerContext { // used for parquet zone map filter only std::unique_ptr conjuncts_manager = nullptr; - using PredicatePtr = std::unique_ptr; - std::vector predicate_free_pool; + std::vector> predicate_free_pool; PredicateTree predicate_tree; }; @@ -408,7 +407,6 @@ class HdfsScanner { std::atomic _closed = false; Status _build_scanner_context(); void update_hdfs_counter(HdfsScanProfile* profile); - std::unique_ptr _conjuncts_manager; protected: HdfsScannerContext _scanner_ctx; diff --git a/be/test/storage/conjunctive_predicates_test.cpp b/be/test/storage/conjunctive_predicates_test.cpp index 54a3d3e1a0c5f..3d0bd87df6019 100644 --- a/be/test/storage/conjunctive_predicates_test.cpp +++ b/be/test/storage/conjunctive_predicates_test.cpp @@ -373,7 +373,7 @@ TEST_P(ConjunctiveTestFixture, test_parse_conjuncts) { OlapScanConjunctsManager cm(std::move(opts)); ASSERT_OK(cm.parse_conjuncts()); - PredicateParser parser(tablet_schema); + OlapPredicateParser parser(tablet_schema); ColumnPredicatePtrs col_preds_owner; auto status_or_pred_tree = cm.get_predicate_tree(&parser, col_preds_owner); ASSERT_OK(status_or_pred_tree); From a7c6ca29826a8c779801799f4dc3edfe18925832 Mon Sep 17 00:00:00 2001 From: Smith Cruise Date: Fri, 15 Nov 2024 15:20:36 +0800 Subject: [PATCH 5/7] update Signed-off-by: Smith Cruise --- be/src/exec/hdfs_scanner.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/exec/hdfs_scanner.cpp b/be/src/exec/hdfs_scanner.cpp index d4c47f76e9b20..a804ccb8bbe55 100644 --- a/be/src/exec/hdfs_scanner.cpp +++ b/be/src/exec/hdfs_scanner.cpp @@ -21,6 +21,7 @@ #include "io/cache_select_input_stream.hpp" #include "io/compressed_input_stream.h" #include "io/shared_buffered_input_stream.h" +#include "pipeline/fragment_context.h" #include "storage/predicate_parser.h" #include "util/compression/compression_utils.h" #include "util/compression/stream_compression.h" @@ -179,7 +180,7 @@ Status HdfsScanner::_build_scanner_context() { opts.runtime_state = _runtime_state; opts.enable_column_expr_predicate = true; opts.is_olap_scan = false; - opts.pred_tree_params.enable_or = true; + opts.pred_tree_params = _runtime_state->fragment_ctx()->pred_tree_params(); ctx.conjuncts_manager = std::make_unique(std::move(opts)); RETURN_IF_ERROR(ctx.conjuncts_manager->parse_conjuncts()); ConnectorPredicateParser predicate_parser{&ctx.slot_descs}; From ffc23a0822864d6bd46cff43357472dd68d5dd15 Mon Sep 17 00:00:00 2001 From: Smith Cruise Date: Fri, 15 Nov 2024 16:11:22 +0800 Subject: [PATCH 6/7] rm Signed-off-by: Smith Cruise --- be/src/formats/parquet/file_reader.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/be/src/formats/parquet/file_reader.h b/be/src/formats/parquet/file_reader.h index 8901047ab1606..c83e6e4c1dfa6 100644 --- a/be/src/formats/parquet/file_reader.h +++ b/be/src/formats/parquet/file_reader.h @@ -111,8 +111,6 @@ class FileReader { Status _read_min_max_chunk(const GroupReaderPtr& group_reader, const std::vector& slots, ChunkPtr* min_chunk, ChunkPtr* max_chunk) const; - // StatusOr> _read_zone_map_from_row_group(const tparquet::RowGroup& row_group, const std::vector& slot); - // only scan partition column + not exist column Status _exec_no_materialized_column_scan(ChunkPtr* chunk); From 78f19c09c907f3589914ef705bfeceba9e4616df Mon Sep 17 00:00:00 2001 From: Smith Cruise Date: Mon, 18 Nov 2024 20:30:19 +0800 Subject: [PATCH 7/7] fix ut Signed-off-by: Smith Cruise --- be/src/connector/hive_connector.cpp | 31 +++++++++++-------- be/src/connector/hive_connector.h | 3 +- be/src/formats/parquet/file_reader.cpp | 5 +-- be/src/formats/parquet/group_reader.cpp | 2 +- be/src/formats/parquet/group_reader.h | 2 +- .../parquet/zone_map_filter_evaluator.h | 2 +- be/test/exec/hdfs_scan_node_test.cpp | 4 +++ be/test/exec/hdfs_scanner_test.cpp | 5 +++ 8 files changed, 35 insertions(+), 19 deletions(-) diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index 490d1369b5e1a..09bd40fe7935c 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -218,19 +218,7 @@ Status HiveDataSource::_init_conjunct_ctxs(RuntimeState* state) { _update_has_any_predicate(); RETURN_IF_ERROR(_decompose_conjunct_ctxs(state)); - { - std::vector cloned_conjunct_ctxs; - RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _min_max_conjunct_ctxs, &cloned_conjunct_ctxs)); - for (auto* ctx : cloned_conjunct_ctxs) { - _all_conjunct_ctxs.emplace_back(ctx); - } - - cloned_conjunct_ctxs.clear(); - RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _conjunct_ctxs, &cloned_conjunct_ctxs)); - for (auto* ctx : cloned_conjunct_ctxs) { - _all_conjunct_ctxs.emplace_back(ctx); - } - } + RETURN_IF_ERROR(_setup_all_conjunct_ctxs(state)); return Status::OK(); } @@ -459,6 +447,23 @@ Status HiveDataSource::_decompose_conjunct_ctxs(RuntimeState* state) { return Status::OK(); } +Status HiveDataSource::_setup_all_conjunct_ctxs(RuntimeState* state) { + // clone conjunct from _min_max_conjunct_ctxs & _conjunct_ctxs + // then we will generate PredicateTree based on _all_conjunct_ctxs + std::vector cloned_conjunct_ctxs; + RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _min_max_conjunct_ctxs, &cloned_conjunct_ctxs)); + for (auto* ctx : cloned_conjunct_ctxs) { + _all_conjunct_ctxs.emplace_back(ctx); + } + + cloned_conjunct_ctxs.clear(); + RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _conjunct_ctxs, &cloned_conjunct_ctxs)); + for (auto* ctx : cloned_conjunct_ctxs) { + _all_conjunct_ctxs.emplace_back(ctx); + } + return Status::OK(); +} + void HiveDataSource::_init_counter(RuntimeState* state) { const auto& hdfs_scan_node = _provider->_hdfs_scan_node; diff --git a/be/src/connector/hive_connector.h b/be/src/connector/hive_connector.h index f2aa2e6d181a3..50f097434fd8c 100644 --- a/be/src/connector/hive_connector.h +++ b/be/src/connector/hive_connector.h @@ -92,6 +92,7 @@ class HiveDataSource final : public DataSource { Status _init_conjunct_ctxs(RuntimeState* state); void _update_has_any_predicate(); Status _decompose_conjunct_ctxs(RuntimeState* state); + Status _setup_all_conjunct_ctxs(RuntimeState* state); void _init_tuples_and_slots(RuntimeState* state); void _init_counter(RuntimeState* state); void _init_rf_counters(); @@ -119,7 +120,7 @@ class HiveDataSource final : public DataSource { // ============ conjuncts ================= std::vector _min_max_conjunct_ctxs; - // whole conjuncts, used to generate PredicateTree + // contains whole conjuncts, used to generate PredicateTree std::vector _all_conjunct_ctxs{}; // complex conjuncts, such as contains multi slot, are evaled in scanner. std::vector _scanner_conjunct_ctxs; diff --git a/be/src/formats/parquet/file_reader.cpp b/be/src/formats/parquet/file_reader.cpp index b02f7672a2a18..4ed6d4898b5fc 100644 --- a/be/src/formats/parquet/file_reader.cpp +++ b/be/src/formats/parquet/file_reader.cpp @@ -326,14 +326,15 @@ bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_read // when doing row group filter, there maybe some error, but we'd better just ignore it instead of returning the error // status and lead to the query failed. bool FileReader::_filter_group(const GroupReaderPtr& group_reader) { - if (_scanner_ctx->conjuncts_manager != nullptr) { + if (config::parquet_advance_zonemap_filter) { auto res = _scanner_ctx->predicate_tree.visit( - ZoneMapEvaluator{_scanner_ctx->predicate_tree, group_reader}); + ZoneMapEvaluator{_scanner_ctx->predicate_tree, group_reader.get()}); if (!res.ok()) { LOG(WARNING) << "filter row group failed: " << res.status().message(); return false; } if (res.value().has_value() && res.value()->empty()) { + // no rows selected, the whole row group can be filtered return true; } return false; diff --git a/be/src/formats/parquet/group_reader.cpp b/be/src/formats/parquet/group_reader.cpp index ed8c541c59a84..a1b3f75fd7e18 100644 --- a/be/src/formats/parquet/group_reader.cpp +++ b/be/src/formats/parquet/group_reader.cpp @@ -133,7 +133,7 @@ const tparquet::ColumnChunk* GroupReader::get_chunk_metadata(SlotId slot_id) { return it->second->get_chunk_metadata(); } -const ColumnReader* GroupReader::get_column_reader(SlotId slot_id) { +ColumnReader* GroupReader::get_column_reader(SlotId slot_id) { const auto& it = _column_readers.find(slot_id); if (it == _column_readers.end()) { return nullptr; diff --git a/be/src/formats/parquet/group_reader.h b/be/src/formats/parquet/group_reader.h index ca84a7a9998dd..dc8196ff96b44 100644 --- a/be/src/formats/parquet/group_reader.h +++ b/be/src/formats/parquet/group_reader.h @@ -112,7 +112,7 @@ class GroupReader { Status prepare(); const tparquet::ColumnChunk* get_chunk_metadata(SlotId slot_id); const ParquetField* get_column_parquet_field(SlotId slot_id); - const ColumnReader* get_column_reader(SlotId slot_id); + ColumnReader* get_column_reader(SlotId slot_id); uint64_t get_row_group_first_row() const { return _row_group_first_row; } const tparquet::RowGroup* get_row_group_metadata() const; Status get_next(ChunkPtr* chunk, size_t* row_count); diff --git a/be/src/formats/parquet/zone_map_filter_evaluator.h b/be/src/formats/parquet/zone_map_filter_evaluator.h index 8bf73fd009c2d..d813113cba6a4 100644 --- a/be/src/formats/parquet/zone_map_filter_evaluator.h +++ b/be/src/formats/parquet/zone_map_filter_evaluator.h @@ -74,7 +74,7 @@ struct ZoneMapEvaluator { } const PredicateTree& pred_tree; - const GroupReaderPtr& group_reader; + GroupReader* group_reader; }; } // namespace starrocks::parquet \ No newline at end of file diff --git a/be/test/exec/hdfs_scan_node_test.cpp b/be/test/exec/hdfs_scan_node_test.cpp index 63969fb67dc03..d5b7243ab8a50 100644 --- a/be/test/exec/hdfs_scan_node_test.cpp +++ b/be/test/exec/hdfs_scan_node_test.cpp @@ -19,6 +19,7 @@ #include "column/column_helper.h" #include "exec/connector_scan_node.h" +#include "exec/pipeline/fragment_context.h" #include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -140,6 +141,9 @@ void HdfsScanNodeTest::_create_runtime_state() { TUniqueId id; _mem_tracker = std::make_shared(-1, "olap scanner test"); _runtime_state->init_mem_trackers(id); + pipeline::FragmentContext* fragment_context = _runtime_state->obj_pool()->add(new pipeline::FragmentContext()); + fragment_context->set_pred_tree_params({true, true}); + _runtime_state->set_fragment_ctx(fragment_context); } std::shared_ptr HdfsScanNodeTest::_create_tplan_node() { diff --git a/be/test/exec/hdfs_scanner_test.cpp b/be/test/exec/hdfs_scanner_test.cpp index ef8de982bcc0e..79ec95d847ed6 100644 --- a/be/test/exec/hdfs_scanner_test.cpp +++ b/be/test/exec/hdfs_scanner_test.cpp @@ -24,6 +24,7 @@ #include "exec/hdfs_scanner_parquet.h" #include "exec/hdfs_scanner_text.h" #include "exec/jni_scanner.h" +#include "exec/pipeline/fragment_context.h" #include "runtime/descriptor_helper.h" #include "runtime/runtime_state.h" #include "storage/chunk_helper.h" @@ -80,6 +81,9 @@ void HdfsScannerTest::_create_runtime_state(const std::string& timezone) { } _runtime_state = _pool.add(new RuntimeState(fragment_id, query_options, query_globals, nullptr)); _runtime_state->init_instance_mem_tracker(); + pipeline::FragmentContext* fragment_context = _pool.add(new pipeline::FragmentContext()); + fragment_context->set_pred_tree_params({true, true}); + _runtime_state->set_fragment_ctx(fragment_context); } Status HdfsScannerTest::_init_datacache(size_t mem_size, const std::string& engine) { @@ -116,6 +120,7 @@ HdfsScannerParams* HdfsScannerTest::_create_param(const std::string& file, THdfs param->file_size = range->file_length; param->scan_range = range; param->tuple_desc = tuple_desc; + param->runtime_filter_collector = _pool.add(new RuntimeFilterProbeCollector()); std::vector materialize_index_in_chunk; std::vector partition_index_in_chunk; std::vector mat_slots;