From 9798c012108e12edaff320221ad7526f2cb901f8 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Fri, 18 Aug 2023 09:57:51 +0800 Subject: [PATCH] [Improvement] Optimize count operation for iceberg (#22923) Iceberg has its own metadata information, which includes count statistics for table data. If the table does not contain equli'ty delete, we can get the count data of the current table directly from the count statistics. --- be/src/vec/exec/format/generic_reader.h | 1 + .../vec/exec/format/table/iceberg_reader.cpp | 33 +++++++++- be/src/vec/exec/format/table/iceberg_reader.h | 6 +- be/src/vec/exec/scan/vfile_scanner.cpp | 6 +- be/src/vec/exec/scan/vscan_node.cpp | 5 ++ be/src/vec/exec/scan/vscan_node.h | 6 ++ .../org/apache/doris/planner/PlanNode.java | 4 ++ .../external/iceberg/IcebergScanNode.java | 63 ++++++++++++++++++- .../external/iceberg/IcebergSplit.java | 4 +- gensrc/thrift/PlanNodes.thrift | 2 + 10 files changed, 121 insertions(+), 9 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 7b6f3c7b9c733a6..7842f2edb920592 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -36,6 +36,7 @@ class GenericReader { void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { _push_down_agg_type = push_down_agg_type; } + virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; virtual std::unordered_map get_name_to_type() { diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 6d2f57258680cc5..1b0f05cc950feec 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -90,14 +90,15 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx) + io::IOContext* io_ctx, int64_t push_down_count) : TableFormatReader(std::move(file_format_reader)), _profile(profile), _state(state), _params(params), _range(range), _kv_cache(kv_cache), - _io_ctx(io_ctx) { + _io_ctx(io_ctx), + _remaining_push_down_count(push_down_count) { static const char* iceberg_profile = "IcebergProfile"; ADD_TIMER(_profile, iceberg_profile); _iceberg_profile.num_delete_files = @@ -132,10 +133,27 @@ Status IcebergTableReader::init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + return status; } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + // already get rows from be + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + auto rows = + std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); + _remaining_push_down_count -= rows; + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + *read_rows = rows; + if (_remaining_push_down_count == 0) { + *eof = true; + } + + return Status::OK(); + } + // To support iceberg schema evolution. We change the column name in block to // make it match with the column name in parquet file before reading data. and // Set the name back to table column name before return this block. @@ -149,6 +167,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* } block->initialize_index_by_name(); } + auto res = _file_format_reader->get_next_block(block, read_rows, eof); // Set the name back to table column name before return this block. if (_has_schema_change) { @@ -182,6 +201,11 @@ Status IcebergTableReader::get_columns( } Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { + // We get the count value by doris's be, so we don't need to read the delete file + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + return Status::OK(); + } + auto& table_desc = range.table_format_params.iceberg_params; auto& version = table_desc.format_version; if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { @@ -192,10 +216,15 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { if (files.empty()) { return Status::OK(); } + if (delete_file_type == POSITION_DELETE) { RETURN_IF_ERROR(_position_delete(files)); } + // todo: equality delete + // If it is a count operation and it has equality delete file kind, + // the push down operation of the count for this split needs to be canceled. + COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size()); return Status::OK(); } diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 451c51445ed9fce..8def49e68cfba72 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -69,8 +69,8 @@ class IcebergTableReader : public TableFormatReader { IcebergTableReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, - const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx); + const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, + int64_t push_down_count); ~IcebergTableReader() override = default; Status init_row_filters(const TFileRangeDesc& range) override; @@ -154,6 +154,8 @@ class IcebergTableReader : public TableFormatReader { io::IOContext* _io_ctx; bool _has_schema_change = false; bool _has_iceberg_schema = false; + + int64_t _remaining_push_down_count; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 6365f60ed6331c1..bc439b73d256e25 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -699,9 +699,9 @@ Status VFileScanner::_get_next_reader() { if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { std::unique_ptr iceberg_reader = - IcebergTableReader::create_unique(std::move(parquet_reader), _profile, - _state, *_params, range, _kv_cache, - _io_ctx.get()); + IcebergTableReader::create_unique( + std::move(parquet_reader), _profile, _state, *_params, range, + _kv_cache, _io_ctx.get(), _parent->get_push_down_count()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index c45524667cabcbf..e65bdab09302e5e 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -126,6 +126,11 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } else { _push_down_agg_type = TPushAggOp::type::NONE; } + + if (tnode.__isset.push_down_count) { + _push_down_count = tnode.push_down_count; + } + return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 6b1922b0b8c0ee9..c023bbabe2a1761 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -136,6 +136,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { } TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } + + int64_t get_push_down_count() { return _push_down_count; } + // Get next block. // If eos is true, no more data will be read and block should be empty. Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; @@ -349,6 +352,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { TPushAggOp::type _push_down_agg_type; + // Record the value of the aggregate function 'count' from doris's be + int64_t _push_down_count = -1; + private: Status _normalize_conjuncts(); Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 579b80586be2726..8dcc8043b447d39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1191,6 +1191,10 @@ public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) { this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp; } + public TPushAggOp getPushDownAggNoGroupingOp() { + return pushDownAggNoGroupingOp; + } + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 5fa64fa2b6a7e53..7293982ebbc3c62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -18,8 +18,10 @@ package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; @@ -43,6 +45,8 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TTableFormatFileDesc; import avro.shaded.com.google.common.base.Preconditions; @@ -55,6 +59,7 @@ import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; @@ -68,6 +73,7 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -79,6 +85,9 @@ public class IcebergScanNode extends FileQueryScanNode { public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; public static final String DEFAULT_DATA_PATH = "/data/"; + private static final String TOTAL_RECORDS = "total-records"; + private static final String TOTAL_POSITION_DELETES = "total-position-deletes"; + private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; private IcebergSource source; private Table icebergTable; @@ -210,8 +219,8 @@ public List getSplits() throws UserException { splitTask.length(), splitTask.file().fileSizeInBytes(), new String[0], + formatVersion, source.getCatalog().getProperties()); - split.setFormatVersion(formatVersion); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } @@ -222,6 +231,12 @@ public List getSplits() throws UserException { throw new UserException(e.getMessage(), e.getCause()); } + TPushAggOp aggOp = getPushDownAggNoGroupingOp(); + if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) { + // we can create a special empty split and skip the plan process + return Collections.singletonList(splits.get(0)); + } + readPartitionNum = partitionPathSet.size(); return splits; @@ -334,4 +349,50 @@ public TableIf getTargetTable() { public Map getLocationProperties() throws UserException { return source.getCatalog().getProperties(); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + String aggFunctionName = aggExpr.getFnName().getFunction().toUpperCase(); + return "COUNT".equals(aggFunctionName); + } + + @Override + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + return !col.isAllowNull(); + } + + private long getCountFromSnapshot() { + Long specifiedSnapshot; + try { + specifiedSnapshot = getSpecifiedSnapshot(); + } catch (UserException e) { + return -1; + } + + Snapshot snapshot = specifiedSnapshot == null + ? icebergTable.currentSnapshot() : icebergTable.snapshot(specifiedSnapshot); + + // empty table + if (snapshot == null) { + return -1; + } + + Map summary = snapshot.summary(); + if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) { + return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); + } else { + return -1; + } + } + + @Override + protected void toThrift(TPlanNode planNode) { + super.toThrift(planNode); + if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) { + long countFromSnapshot = getCountFromSnapshot(); + if (countFromSnapshot > 0) { + planNode.setPushDownCount(countFromSnapshot); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index de3f2ec6aad5145..29deb293b3db75c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -27,10 +27,12 @@ @Data public class IcebergSplit extends FileSplit { + // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts, - Map config) { + Integer formatVersion, Map config) { super(file, start, length, fileLength, hosts, null); + this.formatVersion = formatVersion; this.config = config; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 54253a1619ea3a2..72cd772cfc94131 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1147,6 +1147,8 @@ struct TPlanNode { 47: optional TTestExternalScanNode test_external_scan_node 48: optional TPushAggOp push_down_agg_type_opt + + 49: optional i64 push_down_count 101: optional list projections 102: optional Types.TTupleId output_tuple_id