From e6befffa43c15553f3dbaa9ad6d7ad1907e4a7f1 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Sat, 29 Jul 2023 00:31:01 +0800 Subject: [PATCH] [opt](hive)opt select count(*) stmt push down agg on parquet in hive . (#22115) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Optimization "select count(*) from table" stmtement , push down "count" type to BE. support file type : parquet ,orc in hive . 1. 4kfiles , 60kwline num before: 1 min 37.70 sec after: 50.18 sec 2. 50files , 60kwline num before: 1.12 sec after: 0.82 sec --- be/src/vec/exec/format/generic_reader.h | 8 + be/src/vec/exec/format/orc/vorc_reader.cpp | 18 ++ be/src/vec/exec/format/orc/vorc_reader.h | 3 + .../format/parquet/vparquet_group_reader.h | 2 + .../exec/format/parquet/vparquet_reader.cpp | 18 ++ be/src/vec/exec/scan/new_olap_scan_node.cpp | 8 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 12 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 1 + be/src/vec/exec/scan/vfile_scanner.cpp | 2 + be/src/vec/exec/scan/vscan_node.cpp | 13 ++ be/src/vec/exec/scan/vscan_node.h | 3 + .../implementation/AggregateStrategies.java | 88 +++++++--- .../apache/doris/planner/OlapScanNode.java | 41 ++++- .../org/apache/doris/planner/PlanNode.java | 17 ++ .../doris/planner/SingleNodePlanner.java | 21 +-- .../doris/planner/external/FileScanNode.java | 2 + .../doris/planner/external/HiveScanNode.java | 23 ++- gensrc/thrift/PlanNodes.thrift | 4 +- .../hive/test_select_count_optimize.out | 157 ++++++++++++++++++ .../performance_p0/redundant_conjuncts.out | 2 + .../hive/test_select_count_optimize.groovy | 91 ++++++++++ 21 files changed, 471 insertions(+), 63 deletions(-) create mode 100644 regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out create mode 100644 regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 81ffdcf6fbf219..7b6f3c7b9c733a 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "common/factory_creator.h" #include "common/status.h" #include "runtime/types.h" @@ -30,7 +32,12 @@ class Block; // a set of blocks with specified schema, class GenericReader { public: + GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {} + void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { + _push_down_agg_type = push_down_agg_type; + } virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; + virtual std::unordered_map get_name_to_type() { std::unordered_map map; return map; @@ -67,6 +74,7 @@ class GenericReader { /// Whether the underlying FileReader has filled the partition&missing columns bool _fill_all_columns = false; + TPushAggOp::type _push_down_agg_type; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index cdb3b28f4de8e6..c93c7a7590bdf3 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -239,6 +239,8 @@ Status OrcReader::_create_file_reader() { } catch (std::exception& e) { return Status::InternalError("Init OrcReader failed. reason = {}", e.what()); } + _remaining_rows = _reader->getNumberOfRows(); + return Status::OK(); } @@ -1373,6 +1375,22 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_push_down_agg_type == TPushAggOp::type::COUNT) { + auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size); + + set_remaining_rows(get_remaining_rows() - rows); + + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + + *read_rows = rows; + if (get_remaining_rows() == 0) { + *eof = true; + } + return Status::OK(); + } + if (_lazy_read_ctx.can_lazy_read) { std::vector columns_to_filter; int column_to_keep = block->columns(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 41d919578f3445..9b5c1fe57693d1 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -487,8 +487,11 @@ class OrcReader : public GenericReader { const NullMap* null_map, orc::ColumnVectorBatch* cvb, const orc::Type* orc_column_typ); + int64_t get_remaining_rows() { return _remaining_rows; } + void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } private: + int64_t _remaining_rows = 0; RuntimeProfile* _profile = nullptr; RuntimeState* _state = nullptr; const TFileScanRangeParams& _scan_params; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index ef35f5adf21819..baa5912f990088 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -158,6 +158,8 @@ class RowGroupReader { int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; } ParquetColumnReader::Statistics statistics(); + void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } + int64_t get_remaining_rows() { return _remaining_rows; } private: void _merge_read_ranges(std::vector& row_ranges); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 38dc2a923affe1..ba80992a04f90a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -528,6 +528,24 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) } } DCHECK(_current_group_reader != nullptr); + if (_push_down_agg_type == TPushAggOp::type::COUNT) { + auto rows = std::min(_current_group_reader->get_remaining_rows(), (int64_t)_batch_size); + + _current_group_reader->set_remaining_rows(_current_group_reader->get_remaining_rows() - + rows); + + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + + *read_rows = rows; + if (_current_group_reader->get_remaining_rows() == 0) { + _current_group_reader.reset(nullptr); + } + + return Status::OK(); + } + { SCOPED_RAW_TIMER(&_statistics.column_read_time); Status batch_st = diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index d53094b73ca600..3af5bb9f8945e5 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -249,8 +249,7 @@ Status NewOlapScanNode::_process_conjuncts() { } Status NewOlapScanNode::_build_key_ranges_and_filters() { - if (!_olap_scan_node.__isset.push_down_agg_type_opt || - _olap_scan_node.push_down_agg_type_opt == TPushAggOp::NONE) { + if (_push_down_agg_type == TPushAggOp::NONE) { const std::vector& column_names = _olap_scan_node.key_column_name; const std::vector& column_types = _olap_scan_node.key_column_type; DCHECK(column_types.size() == column_names.size()); @@ -326,9 +325,8 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() { range); } } else { - _runtime_profile->add_info_string( - "PushDownAggregate", - push_down_agg_to_string(_olap_scan_node.push_down_agg_type_opt)); + _runtime_profile->add_info_string("PushDownAggregate", + push_down_agg_to_string(_push_down_agg_type)); } if (_state->enable_profile()) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 702d94c5f31885..51864acb24d869 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -248,14 +248,13 @@ Status NewOlapScanner::_init_tablet_reader_params( const std::vector& function_filters) { // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty const bool single_version = _tablet_reader_params.has_single_version(); - auto real_parent = reinterpret_cast(_parent); + if (_state->skip_storage_engine_merge()) { _tablet_reader_params.direct_mode = true; _aggregation = true; } else { - _tablet_reader_params.direct_mode = - _aggregation || single_version || - real_parent->_olap_scan_node.__isset.push_down_agg_type_opt; + _tablet_reader_params.direct_mode = _aggregation || single_version || + (_parent->get_push_down_agg_type() != TPushAggOp::NONE); } RETURN_IF_ERROR(_init_return_columns()); @@ -264,10 +263,7 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.tablet_schema = _tablet_schema; _tablet_reader_params.reader_type = ReaderType::READER_QUERY; _tablet_reader_params.aggregation = _aggregation; - if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt) { - _tablet_reader_params.push_down_agg_type_opt = - real_parent->_olap_scan_node.push_down_agg_type_opt; - } + _tablet_reader_params.push_down_agg_type_opt = _parent->get_push_down_agg_type(); _tablet_reader_params.version = Version(0, _version); // TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`, diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 632e28e00aeb11..3fa10c9d78af66 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -47,6 +47,7 @@ #include "vec/core/block.h" #include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep #include "vec/exec/scan/scanner_context.h" +#include "vec/exec/scan/vscan_node.h" #include "vec/exec/scan/vscanner.h" #include "vfile_scanner.h" diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 179da42d5f9c1d..e30d7640ed5bdc 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -245,6 +245,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo RETURN_IF_ERROR(_init_src_block(block)); { SCOPED_TIMER(_get_block_timer); + // Read next block. // Some of column in block may not be filled (column not exist in file) RETURN_IF_ERROR( @@ -737,6 +738,7 @@ Status VFileScanner::_get_next_reader() { _name_to_col_type.clear(); _missing_cols.clear(); _cur_reader->get_columns(&_name_to_col_type, &_missing_cols); + _cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type()); RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 66bfe6386d26fc..679edf5370676d 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -113,6 +113,19 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } else { _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; } + + // tnode.olap_scan_node.push_down_agg_type_opt field is deprecated + // Introduced a new field : tnode.push_down_agg_type_opt + // + // make it compatible here + if (tnode.__isset.push_down_agg_type_opt) { + _push_down_agg_type = tnode.push_down_agg_type_opt; + } else if (tnode.olap_scan_node.__isset.push_down_agg_type_opt) { + _push_down_agg_type = tnode.olap_scan_node.push_down_agg_type_opt; + + } else { + _push_down_agg_type = TPushAggOp::type::NONE; + } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 83e8909280d90d..822ae8c1d7d3c3 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -135,6 +135,7 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { } } + TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } // Get next block. // If eos is true, no more data will be read and block should be empty. Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; @@ -351,6 +352,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { std::unordered_map _colname_to_slot_id; std::vector _col_distribute_ids; + TPushAggOp::type _push_down_agg_type; + private: Status _normalize_conjuncts(); Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 6df6b2f8174e30..3912315f4aec2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -55,8 +55,11 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Project; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate; @@ -115,6 +118,20 @@ public List buildRules() { return storageLayerAggregate(agg, project, olapScan, ctx.cascadesContext); }) ), + RuleType.STORAGE_LAYER_AGGREGATE_WITH_PROJECT.build( + logicalAggregate( + logicalProject( + logicalFileScan() + ) + ) + .when(agg -> agg.isNormalized() && enablePushDownNoGroupAgg()) + .thenApply(ctx -> { + LogicalAggregate> agg = ctx.root; + LogicalProject project = agg.child(); + LogicalFileScan fileScan = project.child(); + return storageLayerAggregate(agg, project, fileScan, ctx.cascadesContext); + }) + ), RuleType.ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT.build( basePattern .when(agg -> agg.getDistinctArguments().size() == 0) @@ -190,14 +207,19 @@ && couldConvertToMulti(agg)) private LogicalAggregate storageLayerAggregate( LogicalAggregate aggregate, @Nullable LogicalProject project, - LogicalOlapScan olapScan, CascadesContext cascadesContext) { + LogicalRelation logicalScan, CascadesContext cascadesContext) { final LogicalAggregate canNotPush = aggregate; - KeysType keysType = olapScan.getTable().getKeysType(); - if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) { + if (!(logicalScan instanceof LogicalOlapScan) && !(logicalScan instanceof LogicalFileScan)) { return canNotPush; } + if (logicalScan instanceof LogicalOlapScan) { + KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType(); + if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) { + return canNotPush; + } + } List groupByExpressions = aggregate.getGroupByExpressions(); if (!groupByExpressions.isEmpty() || !aggregate.getDistinctArguments().isEmpty()) { return canNotPush; @@ -213,8 +235,11 @@ private LogicalAggregate storageLayerAggregate( if (!supportedAgg.keySet().containsAll(functionClasses)) { return canNotPush; } - if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) { - return canNotPush; + if (logicalScan instanceof LogicalOlapScan) { + KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType(); + if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) { + return canNotPush; + } } if (aggregateFunctions.stream().anyMatch(fun -> fun.arity() > 1)) { return canNotPush; @@ -281,12 +306,15 @@ private LogicalAggregate storageLayerAggregate( ExpressionUtils.collect(argumentsOfAggregateFunction, SlotReference.class::isInstance); List usedSlotInTable = (List) (List) Project.findProject(aggUsedSlots, - (List) (List) olapScan.getOutput()); + (List) (List) logicalScan.getOutput()); for (SlotReference slot : usedSlotInTable) { Column column = slot.getColumn().get(); - if (keysType == KeysType.AGG_KEYS && !column.isKey()) { - return canNotPush; + if (logicalScan instanceof LogicalOlapScan) { + KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType(); + if (keysType == KeysType.AGG_KEYS && !column.isKey()) { + return canNotPush; + } } // The zone map max length of CharFamily is 512, do not // over the length: https://github.com/apache/doris/pull/6293 @@ -310,19 +338,41 @@ private LogicalAggregate storageLayerAggregate( } } - PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan() - .build() - .transform(olapScan, cascadesContext) - .get(0); - if (project != null) { - return aggregate.withChildren(ImmutableList.of( + if (logicalScan instanceof LogicalOlapScan) { + PhysicalOlapScan physicalScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan() + .build() + .transform((LogicalOlapScan) logicalScan, cascadesContext) + .get(0); + + if (project != null) { + return aggregate.withChildren(ImmutableList.of( project.withChildren( - ImmutableList.of(new PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp))) - )); + ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp))) + )); + } else { + return aggregate.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate(physicalScan, mergeOp) + )); + } + + } else if (logicalScan instanceof LogicalFileScan) { + PhysicalFileScan physicalScan = (PhysicalFileScan) new LogicalFileScanToPhysicalFileScan() + .build() + .transform((LogicalFileScan) logicalScan, cascadesContext) + .get(0); + if (project != null) { + return aggregate.withChildren(ImmutableList.of( + project.withChildren( + ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp))) + )); + } else { + return aggregate.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate(physicalScan, mergeOp) + )); + } + } else { - return aggregate.withChildren(ImmutableList.of( - new PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp) - )); + return canNotPush; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 6f10836a90f552..7b3e95c3a0b544 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.CreateMaterializedViewStmt; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.PartitionNames; @@ -77,7 +78,6 @@ import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TPrimitiveType; -import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; @@ -170,7 +170,6 @@ public class OlapScanNode extends ScanNode { private boolean useTopnOpt = false; - private TPushAggOp pushDownAggNoGroupingOp = null; // List of tablets will be scanned by current olap_scan_node private ArrayList scanTabletIds = Lists.newArrayList(); @@ -223,9 +222,6 @@ public void setIsPreAggregation(boolean isPreAggregation, String reason) { this.reasonOfPreAggregation + " " + reason; } - public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) { - this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp; - } public boolean isPreAggregation() { return isPreAggregation; @@ -1363,9 +1359,10 @@ protected void toThrift(TPlanNode msg) { msg.olap_scan_node.setTableName(olapTable.getName()); msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite()); - if (pushDownAggNoGroupingOp != null) { - msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); - } + msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + + msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + // In TOlapScanNode , pushDownAggNoGroupingOp field is deprecated. if (outputColumnUniqueIds != null) { msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds); @@ -1580,4 +1577,32 @@ public StatsDelta genStatsDelta() throws AnalysisException { olapTable.getId(), selectedIndexId == -1 ? olapTable.getBaseIndexId() : selectedIndexId, scanReplicaIds); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + KeysType type = getOlapTable().getKeysType(); + if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) { + return false; + } + + String aggFunctionName = aggExpr.getFnName().getFunction(); + if (aggFunctionName.equalsIgnoreCase("COUNT") && type != KeysType.DUP_KEYS) { + return false; + } + + return true; + } + + @Override + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + KeysType type = getOlapTable().getKeysType(); + + // The value column of the agg does not support zone_map index. + if (type == KeysType.AGG_KEYS && !col.isKey()) { + return false; + } + + return true; + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index a279af676e3af2..e11f6748758437 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -27,11 +27,13 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprId; import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Type; @@ -46,6 +48,7 @@ import org.apache.doris.thrift.TFunctionBinaryType; import org.apache.doris.thrift.TPlan; import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPushAggOp; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -1175,4 +1178,18 @@ public void setConjuncts(Set exprs) { public void setCardinalityAfterFilter(long cardinalityAfterFilter) { this.cardinalityAfterFilter = cardinalityAfterFilter; } + + protected TPushAggOp pushDownAggNoGroupingOp = TPushAggOp.NONE; + + public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) { + this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp; + } + + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + return false; + } + + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 8c4b4bb0f0ce49..3bce97a0c433fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -56,7 +56,6 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FunctionSet; -import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.PrimitiveType; @@ -423,16 +422,6 @@ private TPushAggOp freshTPushAggOpByName(String functionName, TPushAggOp originA private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, Analyzer analyzer, PlanNode root) { do { - // TODO: Support other scan node in the future - if (!(root instanceof OlapScanNode)) { - break; - } - - KeysType type = ((OlapScanNode) root).getOlapTable().getKeysType(); - if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) { - break; - } - if (CollectionUtils.isNotEmpty(root.getConjuncts())) { break; } @@ -457,7 +446,6 @@ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, boolean aggExprValidate = true; TPushAggOp aggOp = null; for (FunctionCallExpr aggExpr : aggExprs) { - // Only support `min`, `max`, `count` and `count` only effective in dup table String functionName = aggExpr.getFnName().getFunction(); if (!functionName.equalsIgnoreCase("MAX") && !functionName.equalsIgnoreCase("MIN") @@ -466,8 +454,7 @@ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, break; } - if (functionName.equalsIgnoreCase("COUNT") - && type != KeysType.DUP_KEYS) { + if (!root.pushDownAggNoGrouping(aggExpr)) { aggExprValidate = false; break; } @@ -512,8 +499,7 @@ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, continue; } - // The value column of the agg does not support zone_map index. - if (type == KeysType.AGG_KEYS && !col.isKey()) { + if (!root.pushDownAggNoGroupingCheckCol(aggExpr, col)) { returnColumnValidate = false; break; } @@ -556,8 +542,7 @@ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, break; } - OlapScanNode olapNode = (OlapScanNode) root; - olapNode.setPushDownAggNoGrouping(aggOp); + root.setPushDownAggNoGrouping(aggOp); } while (false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java index 5af4a5f123e5c4..38be399244c109 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -76,6 +76,8 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, St @Override protected void toThrift(TPlanNode planNode) { + planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE); TFileScanNode fileScanNode = new TFileScanNode(); fileScanNode.setTupleId(desc.getId().asInt()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 6aa8c3185e275a..211f6e80569976 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -17,6 +17,7 @@ package org.apache.doris.planner.external; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; @@ -67,13 +68,13 @@ public class HiveScanNode extends FileQueryScanNode { public static final String PROP_FIELD_DELIMITER = "field.delim"; public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" - public static final String PROP_LINE_DELIMITER = "line.delim"; public static final String DEFAULT_LINE_DELIMITER = "\n"; public static final String PROP_ARRAY_DELIMITER_HIVE2 = "colelction.delim"; public static final String PROP_ARRAY_DELIMITER_HIVE3 = "collection.delim"; public static final String DEFAULT_ARRAY_DELIMITER = "\2"; + protected final HMSExternalTable hmsTable; private HiveTransaction hiveTransaction = null; @@ -105,12 +106,11 @@ protected void doInitialize() throws UserException { for (SlotDescriptor slot : desc.getSlots()) { if (slot.getType().isMapType() || slot.getType().isStructType()) { throw new UserException("For column `" + slot.getColumn().getName() - + "`, The column types MAP/STRUCT are not supported yet" - + " for text input format of Hive. "); + + "`, The column types MAP/STRUCT are not supported yet" + + " for text input format of Hive. "); } } } - if (hmsTable.isHiveTransactionalTable()) { this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()), ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable()); @@ -310,4 +310,19 @@ private void genSlotToSchemaIdMap() { } params.setSlotNameToSchemaPos(columnNameToPosition); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + + String aggFunctionName = aggExpr.getFnName().getFunction(); + if (aggFunctionName.equalsIgnoreCase("COUNT")) { + return true; + } + return false; + } + + @Override + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + return !col.isAllowNull(); + } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 05f9f0972d5b83..9318b593a397cb 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -636,7 +636,7 @@ struct TOlapScanNode { // It's limit for scanner instead of scanNode so we add a new limit. 10: optional i64 sort_limit 11: optional bool enable_unique_key_merge_on_write - 12: optional TPushAggOp push_down_agg_type_opt + 12: optional TPushAggOp push_down_agg_type_opt //Deprecated 13: optional bool use_topn_opt 14: optional list indexes_desc 15: optional set output_column_unique_ids @@ -1142,6 +1142,8 @@ struct TPlanNode { 46: optional TNestedLoopJoinNode nested_loop_join_node 47: optional TTestExternalScanNode test_external_scan_node + 48: optional TPushAggOp push_down_agg_type_opt + 101: optional list projections 102: optional Types.TTupleId output_tuple_id 103: optional TPartitionSortNode partition_sort_node diff --git a/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out b/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out new file mode 100644 index 00000000000000..9f76685f652d96 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out @@ -0,0 +1,157 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 ALGERIA 0 haggle. carefully final deposits detect slyly agai +1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon +2 BRAZIL 1 y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special +3 CANADA 1 eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold +18 CHINA 2 c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos +4 EGYPT 4 y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d +5 ETHIOPIA 0 ven packages wake quickly. regu +6 FRANCE 3 refully final requests. regular, ironi +7 GERMANY 3 l platelets. regular accounts x-ray: unusual, regular acco +8 INDIA 2 ss excuses cajole slyly across the packages. deposits print aroun +9 INDONESIA 2 slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull +10 IRAN 4 efully alongside of the slyly final dependencies. +11 IRAQ 4 nic deposits boost atop the quickly final requests? quickly regula +12 JAPAN 2 ously. final, express gifts cajole a +13 JORDAN 4 ic deposits are blithely about the carefully regular pa +14 KENYA 0 pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t +15 MOROCCO 0 rns. blithely bold courts among the closely regular packages use furiously bold platelets? +16 MOZAMBIQUE 0 s. ironic, unusual asymptotes wake blithely r +17 PERU 1 platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun +19 ROMANIA 3 ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account +22 RUSSIA 3 requests against the platelets use never according to the quickly regular pint +20 SAUDI ARABIA 4 ts. silent requests haggle. closely express packages sleep across the blithely +23 UNITED KINGDOM 3 eans boost carefully special requests. accounts are. carefull +24 UNITED STATES 1 y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be +21 VIETNAM 2 hely enticingly express accounts. even, final + +-- !sql -- +25 + +-- !sql -- +25 + +-- !sql -- +0 + +-- !sql -- +5 + +-- !sql -- +4 + +-- !sql -- +0 + +-- !sql -- +5 +5 +5 +5 +5 + +-- !sql -- +5999989709 + +-- !sql -- +200000000 + +-- !sql -- +200000000 + +-- !sql -- +3995860 +3996114 +3997119 +3997177 +3997193 +3997623 +3998060 +3998197 +3998199 +3998205 +3998246 +3998259 +3998308 +3998860 +3998903 +3999137 +3999286 +3999411 +3999441 +3999477 +3999643 +3999670 +3999687 +3999830 +4000095 +4000151 +4000164 +4000268 +4000572 +4000594 +4000664 +4000672 +4000711 +4001091 +4001127 +4001273 +4001351 +4001463 +4001520 +4001568 +4001718 +4001940 +4001942 +4002064 +4002067 +4002305 +4002815 +4002966 +4003245 +4003749 + +-- !sql -- +3999286 + +-- !sql -- +210000000 + +-- !sql -- +1 + +-- !sql -- +200000000 + +-- !sql -- +200000000 + +-- !sql -- +3995860 +3996114 +3997119 + +-- !sql -- +210000000 + +-- !sql -- +ALGERIA +ARGENTINA + +-- !sql -- +25 + +-- !sql -- +0 + +-- !sql -- +1 + +-- !sql -- +5 +5 +5 +5 +5 + diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out b/regression-test/data/performance_p0/redundant_conjuncts.out index 3baa5b3d930b5e..f82e0c94536911 100644 --- a/regression-test/data/performance_p0/redundant_conjuncts.out +++ b/regression-test/data/performance_p0/redundant_conjuncts.out @@ -12,6 +12,7 @@ PLAN FRAGMENT 0 PREDICATES: `k1` = 1 partitions=0/1, tablets=0/0, tabletList= cardinality=0, avgRowSize=8.0, numNodes=1 + pushAggOp=NONE -- !redundant_conjuncts_gnerated_by_extract_common_filter -- PLAN FRAGMENT 0 @@ -26,4 +27,5 @@ PLAN FRAGMENT 0 PREDICATES: `k1` = 1 OR `k1` = 2 partitions=0/1, tablets=0/0, tabletList= cardinality=0, avgRowSize=8.0, numNodes=1 + pushAggOp=NONE diff --git a/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy b/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy new file mode 100644 index 00000000000000..2a95dc4294c732 --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_select_count_optimize", "p2") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_select_count_optimize" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ set query_timeout=3600; """ + + //parquet + qt_sql """ select * from tpch_1000_parquet.nation order by n_name,n_regionkey,n_nationkey,n_comment ; """ + + qt_sql """ select count(*) from tpch_1000_parquet.nation; """ + + qt_sql """ select count(1024) from tpch_1000_parquet.nation; """ + + qt_sql """ select count(null) from tpch_1000_parquet.nation; """ + + + qt_sql """ select count(*) from tpch_1000_parquet.nation where n_regionkey = 0; """ + + qt_sql """ select max(n_regionkey) from tpch_1000_parquet.nation ;""" + + qt_sql """ select min(n_regionkey) from tpch_1000_parquet.nation ; """ + + qt_sql """ select count(*) as a from tpch_1000_parquet.nation group by n_regionkey order by a; """ + + qt_sql """ select count(*) from tpch_1000_parquet.lineitem; """ + + qt_sql """ select count(*) from tpch_1000_parquet.part; """ + + qt_sql """ select count(p_partkey) from tpch_1000_parquet.part; """ + + qt_sql """ select count(*) as sz from tpch_1000_parquet.part group by p_size order by sz ;""" + + qt_sql """ select count(*) from tpch_1000_parquet.part where p_size = 1; """ + + qt_sql """ select count(*) from user_profile.hive_hll_user_profile_wide_table_parquet; """; + + //orc + qt_sql """ select count(*) from tpch_1000_orc.part where p_partkey=1; """ + + qt_sql """ select max(p_partkey) from tpch_1000_orc.part ; """ + + qt_sql """ select count(p_comment) from tpch_1000_orc.part; """ + + qt_sql """ select count(*) as a from tpch_1000_orc.part group by p_size order by a limit 3 ; """ + + qt_sql """ select count(*) from user_profile.hive_hll_user_profile_wide_table_orc ; """ ; + + //other + qt_sql """ select n_name from tpch_1000.nation order by n_name limit 2; """ + + qt_sql """ select count(*) from tpch_1000.nation; """ + + qt_sql """ select min(n_regionkey) from tpch_1000.nation ; """ + + qt_sql """ select count(*) from tpch_1000.nation where n_nationkey=5;""" + + qt_sql """ select count(*) as a from tpch_1000.nation group by n_regionkey order by a;""" + + } +} +