diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 7b6f3c7b9c733a6..7842f2edb920592 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -36,6 +36,7 @@ class GenericReader { void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { _push_down_agg_type = push_down_agg_type; } + virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; virtual std::unordered_map get_name_to_type() { diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 6d2f57258680cc5..1b0f05cc950feec 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -90,14 +90,15 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx) + io::IOContext* io_ctx, int64_t push_down_count) : TableFormatReader(std::move(file_format_reader)), _profile(profile), _state(state), _params(params), _range(range), _kv_cache(kv_cache), - _io_ctx(io_ctx) { + _io_ctx(io_ctx), + _remaining_push_down_count(push_down_count) { static const char* iceberg_profile = "IcebergProfile"; ADD_TIMER(_profile, iceberg_profile); _iceberg_profile.num_delete_files = @@ -132,10 +133,27 @@ Status IcebergTableReader::init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + return status; } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + // already get rows from be + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + auto rows = + std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); + _remaining_push_down_count -= rows; + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + *read_rows = rows; + if (_remaining_push_down_count == 0) { + *eof = true; + } + + return Status::OK(); + } + // To support iceberg schema evolution. We change the column name in block to // make it match with the column name in parquet file before reading data. and // Set the name back to table column name before return this block. @@ -149,6 +167,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* } block->initialize_index_by_name(); } + auto res = _file_format_reader->get_next_block(block, read_rows, eof); // Set the name back to table column name before return this block. if (_has_schema_change) { @@ -182,6 +201,11 @@ Status IcebergTableReader::get_columns( } Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { + // We get the count value by doris's be, so we don't need to read the delete file + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + return Status::OK(); + } + auto& table_desc = range.table_format_params.iceberg_params; auto& version = table_desc.format_version; if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { @@ -192,10 +216,15 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { if (files.empty()) { return Status::OK(); } + if (delete_file_type == POSITION_DELETE) { RETURN_IF_ERROR(_position_delete(files)); } + // todo: equality delete + // If it is a count operation and it has equality delete file kind, + // the push down operation of the count for this split needs to be canceled. + COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size()); return Status::OK(); } diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 451c51445ed9fce..8def49e68cfba72 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -69,8 +69,8 @@ class IcebergTableReader : public TableFormatReader { IcebergTableReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, - const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx); + const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, + int64_t push_down_count); ~IcebergTableReader() override = default; Status init_row_filters(const TFileRangeDesc& range) override; @@ -154,6 +154,8 @@ class IcebergTableReader : public TableFormatReader { io::IOContext* _io_ctx; bool _has_schema_change = false; bool _has_iceberg_schema = false; + + int64_t _remaining_push_down_count; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 6365f60ed6331c1..bc439b73d256e25 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -699,9 +699,9 @@ Status VFileScanner::_get_next_reader() { if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { std::unique_ptr iceberg_reader = - IcebergTableReader::create_unique(std::move(parquet_reader), _profile, - _state, *_params, range, _kv_cache, - _io_ctx.get()); + IcebergTableReader::create_unique( + std::move(parquet_reader), _profile, _state, *_params, range, + _kv_cache, _io_ctx.get(), _parent->get_push_down_count()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index c45524667cabcbf..e65bdab09302e5e 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -126,6 +126,11 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } else { _push_down_agg_type = TPushAggOp::type::NONE; } + + if (tnode.__isset.push_down_count) { + _push_down_count = tnode.push_down_count; + } + return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 6b1922b0b8c0ee9..c023bbabe2a1761 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -136,6 +136,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { } TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } + + int64_t get_push_down_count() { return _push_down_count; } + // Get next block. // If eos is true, no more data will be read and block should be empty. Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; @@ -349,6 +352,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { TPushAggOp::type _push_down_agg_type; + // Record the value of the aggregate function 'count' from doris's be + int64_t _push_down_count = -1; + private: Status _normalize_conjuncts(); Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 579b80586be2726..8dcc8043b447d39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1191,6 +1191,10 @@ public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) { this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp; } + public TPushAggOp getPushDownAggNoGroupingOp() { + return pushDownAggNoGroupingOp; + } + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 5fa64fa2b6a7e53..7293982ebbc3c62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -18,8 +18,10 @@ package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; @@ -43,6 +45,8 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TTableFormatFileDesc; import avro.shaded.com.google.common.base.Preconditions; @@ -55,6 +59,7 @@ import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; @@ -68,6 +73,7 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -79,6 +85,9 @@ public class IcebergScanNode extends FileQueryScanNode { public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; public static final String DEFAULT_DATA_PATH = "/data/"; + private static final String TOTAL_RECORDS = "total-records"; + private static final String TOTAL_POSITION_DELETES = "total-position-deletes"; + private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; private IcebergSource source; private Table icebergTable; @@ -210,8 +219,8 @@ public List getSplits() throws UserException { splitTask.length(), splitTask.file().fileSizeInBytes(), new String[0], + formatVersion, source.getCatalog().getProperties()); - split.setFormatVersion(formatVersion); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } @@ -222,6 +231,12 @@ public List getSplits() throws UserException { throw new UserException(e.getMessage(), e.getCause()); } + TPushAggOp aggOp = getPushDownAggNoGroupingOp(); + if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) { + // we can create a special empty split and skip the plan process + return Collections.singletonList(splits.get(0)); + } + readPartitionNum = partitionPathSet.size(); return splits; @@ -334,4 +349,50 @@ public TableIf getTargetTable() { public Map getLocationProperties() throws UserException { return source.getCatalog().getProperties(); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + String aggFunctionName = aggExpr.getFnName().getFunction().toUpperCase(); + return "COUNT".equals(aggFunctionName); + } + + @Override + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + return !col.isAllowNull(); + } + + private long getCountFromSnapshot() { + Long specifiedSnapshot; + try { + specifiedSnapshot = getSpecifiedSnapshot(); + } catch (UserException e) { + return -1; + } + + Snapshot snapshot = specifiedSnapshot == null + ? icebergTable.currentSnapshot() : icebergTable.snapshot(specifiedSnapshot); + + // empty table + if (snapshot == null) { + return -1; + } + + Map summary = snapshot.summary(); + if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) { + return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); + } else { + return -1; + } + } + + @Override + protected void toThrift(TPlanNode planNode) { + super.toThrift(planNode); + if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) { + long countFromSnapshot = getCountFromSnapshot(); + if (countFromSnapshot > 0) { + planNode.setPushDownCount(countFromSnapshot); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index de3f2ec6aad5145..29deb293b3db75c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -27,10 +27,12 @@ @Data public class IcebergSplit extends FileSplit { + // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts, - Map config) { + Integer formatVersion, Map config) { super(file, start, length, fileLength, hosts, null); + this.formatVersion = formatVersion; this.config = config; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 54253a1619ea3a2..72cd772cfc94131 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1147,6 +1147,8 @@ struct TPlanNode { 47: optional TTestExternalScanNode test_external_scan_node 48: optional TPushAggOp push_down_agg_type_opt + + 49: optional i64 push_down_count 101: optional list projections 102: optional Types.TTupleId output_tuple_id