From d5e00d0e7a758c13254ba6380153fed884e16687 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Sat, 22 Jul 2023 21:04:16 +0800 Subject: [PATCH 01/10] [opt](hive)opt select count(*) stmt push down agg on hive --- be/src/vec/exec/format/generic_reader.h | 5 + .../format/parquet/vparquet_group_reader.h | 2 +- .../exec/format/parquet/vparquet_reader.cpp | 29 ++++ .../vec/exec/format/parquet/vparquet_reader.h | 1 + be/src/vec/exec/scan/new_olap_scan_node.cpp | 7 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 27 +++- be/src/vec/exec/scan/scanner_scheduler.cpp | 1 + be/src/vec/exec/scan/vfile_scanner.cpp | 12 ++ be/src/vec/exec/scan/vscan_node.cpp | 1 + be/src/vec/exec/scan/vscan_node.h | 3 +- .../implementation/AggregateStrategies.java | 147 ++++++++++++++++++ .../apache/doris/planner/OlapScanNode.java | 37 ++++- .../org/apache/doris/planner/PlanNode.java | 17 ++ .../doris/planner/SingleNodePlanner.java | 23 +-- .../doris/planner/external/FileScanNode.java | 4 + .../doris/planner/external/HiveScanNode.java | 47 ++++-- gensrc/thrift/PlanNodes.thrift | 13 +- 17 files changed, 314 insertions(+), 62 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index d83cc1d2ce9866..54206722ca9960 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -31,6 +31,11 @@ class Block; class GenericReader { public: virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; + + virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof,TPushAggOp::type push_down_agg_type_opt) { + return Status::NotSupported("not support this type!"); + }; + virtual std::unordered_map get_name_to_type() { std::unordered_map map; return map; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index ef35f5adf21819..92ffe14fa11ce9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -158,7 +158,7 @@ class RowGroupReader { int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; } ParquetColumnReader::Statistics statistics(); - + int64_t get__remaining_rows(){ return _remaining_rows;} private: void _merge_read_ranges(std::vector& row_ranges); Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index ed4aa6e6ae3777..f7fee477abf3aa 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -511,6 +511,35 @@ Status ParquetReader::get_columns(std::unordered_mapnum_rows , because for the same file, + // the optimizer may generate multiple VFileScanner with different _scan_range + while (_read_row_groups.size() > 0) { + _next_row_group_reader(); + rows+=_current_group_reader->get__remaining_rows(); + } + + //fill one column is enough + auto cols = block->mutate_columns(); + for(auto& col:cols) { + col->resize(rows); + break; + } + + *read_rows = rows; + _current_group_reader.reset(nullptr); + _row_group_eof = true; + *eof = true; + return Status::OK(); +} + Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { if (_current_group_reader == nullptr || _row_group_eof) { if (_read_row_groups.size() > 0) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 63f760abcd3300..a0133a593f7162 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -119,6 +119,7 @@ class ParquetReader : public GenericReader { Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status get_next_block(Block* block, size_t* read_rows, bool* eof,TPushAggOp::type push_down_agg_type_opt) override; void close(); RowRange get_whole_range() { return _whole_range; } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index f12533431f6fa8..6eb98e8dbfd120 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -249,9 +249,8 @@ Status NewOlapScanNode::_process_conjuncts() { } Status NewOlapScanNode::_build_key_ranges_and_filters() { - if (!_olap_scan_node.__isset.push_down_agg_type_opt || - _olap_scan_node.push_down_agg_type_opt == TPushAggOp::NONE) { - const std::vector& column_names = _olap_scan_node.key_column_name; + if (push_down_agg_type_opt == TPushAggOp::NONE) { + const std::vector& column_names = _olap_scan_node.key_column_name; const std::vector& column_types = _olap_scan_node.key_column_type; DCHECK(column_types.size() == column_names.size()); @@ -328,7 +327,7 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() { } else { _runtime_profile->add_info_string( "PushDownAggregate", - push_down_agg_to_string(_olap_scan_node.push_down_agg_type_opt)); + push_down_agg_to_string(push_down_agg_type_opt)); } if (_state->enable_profile()) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 702d94c5f31885..d60b1cef11a14d 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -247,16 +247,28 @@ Status NewOlapScanner::_init_tablet_reader_params( const FilterPredicates& filter_predicates, const std::vector& function_filters) { // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty - const bool single_version = _tablet_reader_params.has_single_version(); - auto real_parent = reinterpret_cast(_parent); + bool single_version = + (_tablet_reader_params.rs_readers.size() == 1 && + _tablet_reader_params.rs_readers[0]->rowset()->start_version() == 0 && + !_tablet_reader_params.rs_readers[0] + ->rowset() + ->rowset_meta() + ->is_segments_overlapping()) || + (_tablet_reader_params.rs_readers.size() == 2 && + _tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 && + _tablet_reader_params.rs_readers[1]->rowset()->start_version() == 2 && + !_tablet_reader_params.rs_readers[1] + ->rowset() + ->rowset_meta() + ->is_segments_overlapping()); if (_state->skip_storage_engine_merge()) { _tablet_reader_params.direct_mode = true; _aggregation = true; } else { _tablet_reader_params.direct_mode = _aggregation || single_version || - real_parent->_olap_scan_node.__isset.push_down_agg_type_opt; - } + (_parent->push_down_agg_type_opt != TPushAggOp::NONE); + } RETURN_IF_ERROR(_init_return_columns()); @@ -264,10 +276,9 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.tablet_schema = _tablet_schema; _tablet_reader_params.reader_type = ReaderType::READER_QUERY; _tablet_reader_params.aggregation = _aggregation; - if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt) { - _tablet_reader_params.push_down_agg_type_opt = - real_parent->_olap_scan_node.push_down_agg_type_opt; - } + if (_parent->push_down_agg_type_opt) { + _tablet_reader_params.push_down_agg_type_opt = _parent->push_down_agg_type_opt; + } _tablet_reader_params.version = Version(0, _version); // TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`, diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index cb829cfd5af26b..405227a12d24e3 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -49,6 +49,7 @@ #include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep #include "vec/exec/scan/scanner_context.h" #include "vec/exec/scan/vscanner.h" +#include "vec/exec/scan/vscan_node.h" #include "vfile_scanner.h" namespace doris::vectorized { diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 4413f3deaca3d8..e83c75bac1cb8a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -245,7 +245,19 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo RETURN_IF_ERROR(_init_src_block(block)); { SCOPED_TIMER(_get_block_timer); + // Read next block. + + if ( _parent -> push_down_agg_type_opt != TPushAggOp::type ::NONE ){ + //Prevent FE misjudging the "select count/min/max ..." statement + if (Status::OK() == + _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof,_parent-> push_down_agg_type_opt )) + { + _cur_reader.reset(nullptr); + _cur_reader_eof=true; + return Status::OK(); + } + } // Some of column in block may not be filled (column not exist in file) RETURN_IF_ERROR( _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 66bfe6386d26fc..52221ab142f757 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -113,6 +113,7 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } else { _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; } + push_down_agg_type_opt = tnode.push_down_agg_type_opt; return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 83e8909280d90d..af8e3066f38c2b 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -350,7 +350,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { std::unordered_map _colname_to_slot_id; std::vector _col_distribute_ids; - +public: + TPushAggOp::type push_down_agg_type_opt; private: Status _normalize_conjuncts(); Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 6df6b2f8174e30..65b2a88043ee5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -43,6 +43,8 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; import org.apache.doris.nereids.trees.expressions.functions.agg.Count; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupConcat; +import org.apache.doris.nereids.trees.expressions.functions.agg.Max; +import org.apache.doris.nereids.trees.expressions.functions.agg.Min; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctSum; import org.apache.doris.nereids.trees.expressions.functions.agg.Sum; @@ -55,8 +57,10 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Project; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate; @@ -115,6 +119,20 @@ public List buildRules() { return storageLayerAggregate(agg, project, olapScan, ctx.cascadesContext); }) ), + RuleType.STORAGE_LAYER_AGGREGATE_WITH_PROJECT.build( + logicalAggregate( + logicalProject( + logicalFileScan() + ) + ) + .when(agg -> agg.isNormalized() && enablePushDownNoGroupAgg()) + .thenApply(ctx -> { + LogicalAggregate> agg = ctx.root; + LogicalProject project = agg.child(); + LogicalFileScan fileScan = project.child(); + return storageLayerAggregate(agg, project, fileScan, ctx.cascadesContext); + }) + ), RuleType.ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT.build( basePattern .when(agg -> agg.getDistinctArguments().size() == 0) @@ -325,6 +343,135 @@ private LogicalAggregate storageLayerAggregate( )); } } + /** + * sql: select count(*) from tbl + *

+ * before: + *

+ * LogicalAggregate(groupBy=[], output=[count(*)]) + * | + * LogicalFileScan(table=tbl) + *

+ * after: + *

+ * LogicalAggregate(groupBy=[], output=[count(*)]) + * | + * PhysicalStorageLayerAggregate(pushAggOp=COUNT, table=PhysicalFileScan(table=tbl)) + * + */ + + private LogicalAggregate storageLayerAggregate( + LogicalAggregate aggregate, + @Nullable LogicalProject project, + LogicalFileScan fileScan, CascadesContext cascadesContext) { + final LogicalAggregate canNotPush = aggregate; + List groupByExpressions = aggregate.getGroupByExpressions(); + if (!groupByExpressions.isEmpty() || !aggregate.getDistinctArguments().isEmpty()) { + return canNotPush; + } + + Set aggregateFunctions = aggregate.getAggregateFunctions(); + Set> functionClasses = aggregateFunctions + .stream() + .map(AggregateFunction::getClass) + .collect(Collectors.toSet()); + + Map, PushDownAggOp> supportedAgg = PushDownAggOp.supportedFunctions(); + if (!supportedAgg.keySet().containsAll(functionClasses)) { + return canNotPush; + } + + if (functionClasses.contains(Min.class) || functionClasses.contains(Max.class)) { + return canNotPush; + } + + if (aggregateFunctions.stream().anyMatch(fun -> fun.arity() > 1)) { + return canNotPush; + } + + // TODO: refactor this to process slot reference or expression together + boolean onlyContainsSlotOrNumericCastSlot = aggregateFunctions.stream() + .map(ExpressionTrait::getArguments) + .flatMap(List::stream) + .allMatch(argument -> { + if (argument instanceof SlotReference) { + return true; + } + if (argument instanceof Cast) { + return argument.child(0) instanceof SlotReference + && argument.getDataType().isNumericType() + && argument.child(0).getDataType().isNumericType(); + } + return false; + }); + if (!onlyContainsSlotOrNumericCastSlot) { + return canNotPush; + } + + // we already normalize the arguments to slotReference + List argumentsOfAggregateFunction = aggregateFunctions.stream() + .flatMap(aggregateFunction -> aggregateFunction.getArguments().stream()) + .collect(ImmutableList.toImmutableList()); + + if (project != null) { + argumentsOfAggregateFunction = Project.findProject( + (List) (List) argumentsOfAggregateFunction, project.getProjects()) + .stream() + .map(p -> p instanceof Alias ? p.child(0) : p) + .collect(ImmutableList.toImmutableList()); + } + + onlyContainsSlotOrNumericCastSlot = argumentsOfAggregateFunction + .stream() + .allMatch(argument -> { + if (argument instanceof SlotReference) { + return true; + } + if (argument instanceof Cast) { + return argument.child(0) instanceof SlotReference + && argument.getDataType().isNumericType() + && argument.child(0).getDataType().isNumericType(); + } + return false; + }); + if (!onlyContainsSlotOrNumericCastSlot) { + return canNotPush; + } + + Set pushDownAggOps = functionClasses.stream() + .map(supportedAgg::get) + .collect(Collectors.toSet()); + + PushDownAggOp aggOp = pushDownAggOps.iterator().next(); + + Set aggUsedSlots = + ExpressionUtils.collect(argumentsOfAggregateFunction, SlotReference.class::isInstance); + + List usedSlotInTable = (List) (List) Project.findProject(aggUsedSlots, + (List) (List) fileScan.getOutput()); + + for (SlotReference slot : usedSlotInTable) { + Column column = slot.getColumn().get(); + if (column.isAllowNull()) { + return canNotPush; + } + } + + PhysicalFileScan physicalfileScan = (PhysicalFileScan) new LogicalFileScanToPhysicalFileScan() + .build() + .transform(fileScan, cascadesContext) + .get(0); + if (project != null) { + return aggregate.withChildren(ImmutableList.of( + project.withChildren( + ImmutableList.of(new PhysicalStorageLayerAggregate(physicalfileScan, aggOp))) + )); + } else { + return aggregate.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate(physicalfileScan, aggOp) + )); + } + } /** * sql: select count(*) from tbl group by id diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 6f10836a90f552..265c8972e43cfd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.CreateMaterializedViewStmt; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.PartitionNames; @@ -170,7 +171,6 @@ public class OlapScanNode extends ScanNode { private boolean useTopnOpt = false; - private TPushAggOp pushDownAggNoGroupingOp = null; // List of tablets will be scanned by current olap_scan_node private ArrayList scanTabletIds = Lists.newArrayList(); @@ -223,9 +223,6 @@ public void setIsPreAggregation(boolean isPreAggregation, String reason) { this.reasonOfPreAggregation + " " + reason; } - public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) { - this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp; - } public boolean isPreAggregation() { return isPreAggregation; @@ -1363,8 +1360,8 @@ protected void toThrift(TPlanNode msg) { msg.olap_scan_node.setTableName(olapTable.getName()); msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite()); - if (pushDownAggNoGroupingOp != null) { - msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + if (pushDownAggNoGroupingOp != TPushAggOp.NONE) { + msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); } if (outputColumnUniqueIds != null) { @@ -1580,4 +1577,32 @@ public StatsDelta genStatsDelta() throws AnalysisException { olapTable.getId(), selectedIndexId == -1 ? olapTable.getBaseIndexId() : selectedIndexId, scanReplicaIds); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + KeysType type = getOlapTable().getKeysType(); + if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) { + return false; + } + + String aggFunctionName = aggExpr.getFnName().getFunction(); + if (aggFunctionName.equalsIgnoreCase("COUNT") && type != KeysType.DUP_KEYS) { + return false; + } + + return true; + } + + @Override + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + KeysType type = getOlapTable().getKeysType(); + + // The value column of the agg does not support zone_map index. + if (type == KeysType.AGG_KEYS && !col.isKey()) { + return false; + } + + return true; + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index a279af676e3af2..e11f6748758437 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -27,11 +27,13 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprId; import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Type; @@ -46,6 +48,7 @@ import org.apache.doris.thrift.TFunctionBinaryType; import org.apache.doris.thrift.TPlan; import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPushAggOp; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -1175,4 +1178,18 @@ public void setConjuncts(Set exprs) { public void setCardinalityAfterFilter(long cardinalityAfterFilter) { this.cardinalityAfterFilter = cardinalityAfterFilter; } + + protected TPushAggOp pushDownAggNoGroupingOp = TPushAggOp.NONE; + + public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) { + this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp; + } + + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + return false; + } + + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 61cb2d8cc7a67e..3bce97a0c433fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -56,7 +56,6 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FunctionSet; -import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.PrimitiveType; @@ -423,16 +422,6 @@ private TPushAggOp freshTPushAggOpByName(String functionName, TPushAggOp originA private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, Analyzer analyzer, PlanNode root) { do { - // TODO: Support other scan node in the future - if (!(root instanceof OlapScanNode)) { - break; - } - - KeysType type = ((OlapScanNode) root).getOlapTable().getKeysType(); - if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) { - break; - } - if (CollectionUtils.isNotEmpty(root.getConjuncts())) { break; } @@ -457,7 +446,6 @@ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, boolean aggExprValidate = true; TPushAggOp aggOp = null; for (FunctionCallExpr aggExpr : aggExprs) { - // Only support `min`, `max`, `count` and `count` only effective in dup table String functionName = aggExpr.getFnName().getFunction(); if (!functionName.equalsIgnoreCase("MAX") && !functionName.equalsIgnoreCase("MIN") @@ -466,8 +454,7 @@ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, break; } - if (functionName.equalsIgnoreCase("COUNT") - && type != KeysType.DUP_KEYS) { + if (!root.pushDownAggNoGrouping(aggExpr)) { aggExprValidate = false; break; } @@ -512,8 +499,7 @@ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, continue; } - // The value column of the agg does not support zone_map index. - if (type == KeysType.AGG_KEYS && !col.isKey()) { + if (!root.pushDownAggNoGroupingCheckCol(aggExpr, col)) { returnColumnValidate = false; break; } @@ -556,8 +542,7 @@ private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, break; } - OlapScanNode olapNode = (OlapScanNode) root; - olapNode.setPushDownAggNoGrouping(aggOp); + root.setPushDownAggNoGrouping(aggOp); } while (false); } @@ -2210,7 +2195,7 @@ private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef inne /** * Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef, - * TableValuedFunctionRef, CollectionTableRef or an InlineViewRef. + * CollectionTableRef or an InlineViewRef. *

* 'fastPartitionKeyScans' indicates whether to try to produce the slots with * metadata instead of table scans. Only applicable to BaseTableRef which is also diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java index 5af4a5f123e5c4..5e299acc1dcf96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -37,6 +37,7 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Preconditions; @@ -76,6 +77,9 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, St @Override protected void toThrift(TPlanNode planNode) { + if (pushDownAggNoGroupingOp != TPushAggOp.NONE) { + planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + } planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE); TFileScanNode fileScanNode = new TFileScanNode(); fileScanNode.setTupleId(desc.getId().asInt()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 1fe115a0b9802c..a745be8dee53c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -17,6 +17,7 @@ package org.apache.doris.planner.external; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; @@ -67,13 +68,8 @@ public class HiveScanNode extends FileQueryScanNode { public static final String PROP_FIELD_DELIMITER = "field.delim"; public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" - - public static final String PROP_LINE_DELIMITER = "line.delim"; public static final String DEFAULT_LINE_DELIMITER = "\n"; - public static final String PROP_ARRAY_DELIMITER_HIVE2 = "colelction.delim"; - public static final String PROP_ARRAY_DELIMITER_HIVE3 = "collection.delim"; - public static final String DEFAULT_ARRAY_DELIMITER = "\2"; protected final HMSExternalTable hmsTable; private HiveTransaction hiveTransaction = null; @@ -103,9 +99,9 @@ protected void doInitialize() throws UserException { String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); if (inputFormat.contains("TextInputFormat")) { for (SlotDescriptor slot : desc.getSlots()) { - if (slot.getType().isMapType() || slot.getType().isStructType()) { + if (!slot.getType().isScalarType()) { throw new UserException("For column `" + slot.getColumn().getName() - + "`, The column types MAP/STRUCT are not supported yet" + + "`, The column types ARRAY/MAP/STRUCT are not supported yet" + " for text input format of Hive. "); } } @@ -278,16 +274,9 @@ protected Map getLocationProperties() throws UserException { @Override protected TFileAttributes getFileAttributes() throws UserException { TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - java.util.Map delimiter = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); - textParams.setColumnSeparator(delimiter.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); - textParams.setLineDelimiter(delimiter.getOrDefault(PROP_LINE_DELIMITER, DEFAULT_LINE_DELIMITER)); - if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE2) != null) { - textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE2)); - } else if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE3) != null) { - textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE3)); - } else { - textParams.setArrayDelimiter(DEFAULT_ARRAY_DELIMITER); - } + textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() + .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); + textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER); TFileAttributes fileAttributes = new TFileAttributes(); fileAttributes.setTextParams(textParams); fileAttributes.setHeaderType(""); @@ -310,4 +299,28 @@ private void genSlotToSchemaIdMap() { } params.setSlotNameToSchemaPos(columnNameToPosition); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + TFileFormatType fileFormatType; + try { + fileFormatType = getFileFormatType(); + } catch (UserException e) { + throw new RuntimeException(e); + } + + String aggFunctionName = aggExpr.getFnName().getFunction(); + if (aggFunctionName.equalsIgnoreCase("COUNT") && fileFormatType == TFileFormatType.FORMAT_PARQUET) { + return true; + } + return false; + } + + @Override + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + if (col.isAllowNull()) { + return false; + } + return true; + } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 05f9f0972d5b83..bdbc077e0b0803 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -636,12 +636,11 @@ struct TOlapScanNode { // It's limit for scanner instead of scanNode so we add a new limit. 10: optional i64 sort_limit 11: optional bool enable_unique_key_merge_on_write - 12: optional TPushAggOp push_down_agg_type_opt - 13: optional bool use_topn_opt - 14: optional list indexes_desc - 15: optional set output_column_unique_ids - 16: optional list distribute_column_ids - 17: optional i32 schema_version + 12: optional bool use_topn_opt + 13: optional list indexes_desc + 14: optional set output_column_unique_ids + 15: optional list distribute_column_ids + 16: optional i32 schema_version } struct TEqJoinCondition { @@ -1142,6 +1141,8 @@ struct TPlanNode { 46: optional TNestedLoopJoinNode nested_loop_join_node 47: optional TTestExternalScanNode test_external_scan_node + 48: optional TPushAggOp push_down_agg_type_opt + 101: optional list projections 102: optional Types.TTupleId output_tuple_id 103: optional TPartitionSortNode partition_sort_node From e8a593f3380776cc1baca2baf34e49ebe4b13b33 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Sat, 22 Jul 2023 21:06:21 +0800 Subject: [PATCH 02/10] [opt](hive)opt select count(*) stmt push down agg on hive --- be/src/vec/exec/format/generic_reader.h | 5 +++-- .../vec/exec/format/parquet/vparquet_group_reader.h | 3 ++- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 11 +++++------ be/src/vec/exec/format/parquet/vparquet_reader.h | 3 ++- be/src/vec/exec/scan/new_olap_scan_node.cpp | 7 +++---- be/src/vec/exec/scan/new_olap_scanner.cpp | 13 ++++++------- be/src/vec/exec/scan/scanner_scheduler.cpp | 2 +- be/src/vec/exec/scan/vfile_scanner.cpp | 10 +++++----- be/src/vec/exec/scan/vscan_node.h | 4 +++- 9 files changed, 30 insertions(+), 28 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 54206722ca9960..32ec94a3d30e4f 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -31,8 +31,9 @@ class Block; class GenericReader { public: virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; - - virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof,TPushAggOp::type push_down_agg_type_opt) { + + virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof, + TPushAggOp::type push_down_agg_type_opt) { return Status::NotSupported("not support this type!"); }; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 92ffe14fa11ce9..dc97c76b290f2e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -158,7 +158,8 @@ class RowGroupReader { int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; } ParquetColumnReader::Statistics statistics(); - int64_t get__remaining_rows(){ return _remaining_rows;} + int64_t get__remaining_rows() { return _remaining_rows; } + private: void _merge_read_ranges(std::vector& row_ranges); Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index f7fee477abf3aa..148316c7030f90 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -511,24 +511,23 @@ Status ParquetReader::get_columns(std::unordered_mapnum_rows , because for the same file, // the optimizer may generate multiple VFileScanner with different _scan_range while (_read_row_groups.size() > 0) { _next_row_group_reader(); - rows+=_current_group_reader->get__remaining_rows(); + rows += _current_group_reader->get__remaining_rows(); } //fill one column is enough auto cols = block->mutate_columns(); - for(auto& col:cols) { + for (auto& col : cols) { col->resize(rows); break; } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index a0133a593f7162..67a546f0791c7d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -119,7 +119,8 @@ class ParquetReader : public GenericReader { Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; - Status get_next_block(Block* block, size_t* read_rows, bool* eof,TPushAggOp::type push_down_agg_type_opt) override; + Status get_next_block(Block* block, size_t* read_rows, bool* eof, + TPushAggOp::type push_down_agg_type_opt) override; void close(); RowRange get_whole_range() { return _whole_range; } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 6eb98e8dbfd120..3ed14e9307f45b 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -250,7 +250,7 @@ Status NewOlapScanNode::_process_conjuncts() { Status NewOlapScanNode::_build_key_ranges_and_filters() { if (push_down_agg_type_opt == TPushAggOp::NONE) { - const std::vector& column_names = _olap_scan_node.key_column_name; + const std::vector& column_names = _olap_scan_node.key_column_name; const std::vector& column_types = _olap_scan_node.key_column_type; DCHECK(column_types.size() == column_names.size()); @@ -325,9 +325,8 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() { range); } } else { - _runtime_profile->add_info_string( - "PushDownAggregate", - push_down_agg_to_string(push_down_agg_type_opt)); + _runtime_profile->add_info_string("PushDownAggregate", + push_down_agg_to_string(push_down_agg_type_opt)); } if (_state->enable_profile()) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index d60b1cef11a14d..cdba8f09a4bf9b 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -265,10 +265,9 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.direct_mode = true; _aggregation = true; } else { - _tablet_reader_params.direct_mode = - _aggregation || single_version || - (_parent->push_down_agg_type_opt != TPushAggOp::NONE); - } + _tablet_reader_params.direct_mode = _aggregation || single_version || + (_parent->push_down_agg_type_opt != TPushAggOp::NONE); + } RETURN_IF_ERROR(_init_return_columns()); @@ -276,9 +275,9 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.tablet_schema = _tablet_schema; _tablet_reader_params.reader_type = ReaderType::READER_QUERY; _tablet_reader_params.aggregation = _aggregation; - if (_parent->push_down_agg_type_opt) { - _tablet_reader_params.push_down_agg_type_opt = _parent->push_down_agg_type_opt; - } + if (_parent->push_down_agg_type_opt) { + _tablet_reader_params.push_down_agg_type_opt = _parent->push_down_agg_type_opt; + } _tablet_reader_params.version = Version(0, _version); // TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`, diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 405227a12d24e3..1f8e2051bf65fa 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -48,8 +48,8 @@ #include "vec/core/block.h" #include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep #include "vec/exec/scan/scanner_context.h" -#include "vec/exec/scan/vscanner.h" #include "vec/exec/scan/vscan_node.h" +#include "vec/exec/scan/vscanner.h" #include "vfile_scanner.h" namespace doris::vectorized { diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index e83c75bac1cb8a..8b7e1d1f1b07d2 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -248,13 +248,13 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo // Read next block. - if ( _parent -> push_down_agg_type_opt != TPushAggOp::type ::NONE ){ + if (_parent->push_down_agg_type_opt != TPushAggOp::type ::NONE) { //Prevent FE misjudging the "select count/min/max ..." statement - if (Status::OK() == - _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof,_parent-> push_down_agg_type_opt )) - { + if (Status::OK() == _cur_reader->get_next_block(_src_block_ptr, &read_rows, + &_cur_reader_eof, + _parent->push_down_agg_type_opt)) { _cur_reader.reset(nullptr); - _cur_reader_eof=true; + _cur_reader_eof = true; return Status::OK(); } } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index af8e3066f38c2b..8b3ac347a0f993 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -350,8 +350,10 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { std::unordered_map _colname_to_slot_id; std::vector _col_distribute_ids; + public: - TPushAggOp::type push_down_agg_type_opt; + TPushAggOp::type push_down_agg_type_opt; + private: Status _normalize_conjuncts(); Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context, From 16312f01a0f69ece5bda3f2021b84b775e7e4629 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Tue, 25 Jul 2023 00:56:11 +0800 Subject: [PATCH 03/10] [opt](hive)opt select count(*) stmt push down agg on hive --- be/src/vec/exec/format/generic_reader.h | 10 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 19 ++ be/src/vec/exec/format/orc/vorc_reader.h | 3 + .../format/parquet/vparquet_group_reader.h | 3 +- .../exec/format/parquet/vparquet_reader.cpp | 46 ++-- .../vec/exec/format/parquet/vparquet_reader.h | 2 - be/src/vec/exec/scan/new_olap_scan_node.cpp | 4 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 6 +- be/src/vec/exec/scan/vfile_scanner.cpp | 12 +- be/src/vec/exec/scan/vscan_node.cpp | 10 +- be/src/vec/exec/scan/vscan_node.h | 4 +- .../implementation/AggregateStrategies.java | 211 +++++------------- .../apache/doris/planner/OlapScanNode.java | 12 +- .../doris/planner/external/FileScanNode.java | 6 +- .../doris/planner/external/HiveScanNode.java | 51 ++--- gensrc/thrift/PlanNodes.thrift | 11 +- 16 files changed, 156 insertions(+), 254 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 32ec94a3d30e4f..fedd476f43c05b 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -30,13 +30,12 @@ class Block; // a set of blocks with specified schema, class GenericReader { public: + GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {} + void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { + _push_down_agg_type = push_down_agg_type; + } virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; - virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof, - TPushAggOp::type push_down_agg_type_opt) { - return Status::NotSupported("not support this type!"); - }; - virtual std::unordered_map get_name_to_type() { std::unordered_map map; return map; @@ -71,6 +70,7 @@ class GenericReader { /// Whether the underlying FileReader has filled the partition&missing columns bool _fill_all_columns = false; + TPushAggOp::type _push_down_agg_type; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 77fd37e913f29f..5ca61f5d0ded85 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -236,6 +236,8 @@ Status OrcReader::_create_file_reader() { } catch (std::exception& e) { return Status::InternalError("Init OrcReader failed. reason = {}", e.what()); } + _remaining_rows = _reader->getNumberOfRows(); + return Status::OK(); } @@ -1370,6 +1372,23 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_push_down_agg_type == TPushAggOp::type::COUNT) { + auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size); + + set_remaining_rows(get_remaining_rows() - rows); + + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + + *read_rows = rows; + if (get_remaining_rows() == 0) { + *eof = true; + } + *read_rows = rows; + return Status::OK(); + } + if (_lazy_read_ctx.can_lazy_read) { std::vector columns_to_filter; int column_to_keep = block->columns(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index b558f064658f01..2f5498b2cd8e78 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -482,8 +482,11 @@ class OrcReader : public GenericReader { const NullMap* null_map, orc::ColumnVectorBatch* cvb, const orc::Type* orc_column_typ); + int64_t get_remaining_rows() { return _remaining_rows; } + void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } private: + int64_t _remaining_rows = 0; RuntimeProfile* _profile = nullptr; RuntimeState* _state = nullptr; const TFileScanRangeParams& _scan_params; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index dc97c76b290f2e..baa5912f990088 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -158,7 +158,8 @@ class RowGroupReader { int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; } ParquetColumnReader::Statistics statistics(); - int64_t get__remaining_rows() { return _remaining_rows; } + void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } + int64_t get_remaining_rows() { return _remaining_rows; } private: void _merge_read_ranges(std::vector& row_ranges); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 148316c7030f90..9599ed70b4a741 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -511,34 +511,6 @@ Status ParquetReader::get_columns(std::unordered_mapnum_rows , because for the same file, - // the optimizer may generate multiple VFileScanner with different _scan_range - while (_read_row_groups.size() > 0) { - _next_row_group_reader(); - rows += _current_group_reader->get__remaining_rows(); - } - - //fill one column is enough - auto cols = block->mutate_columns(); - for (auto& col : cols) { - col->resize(rows); - break; - } - - *read_rows = rows; - _current_group_reader.reset(nullptr); - _row_group_eof = true; - *eof = true; - return Status::OK(); -} - Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { if (_current_group_reader == nullptr || _row_group_eof) { if (_read_row_groups.size() > 0) { @@ -552,6 +524,24 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) } } DCHECK(_current_group_reader != nullptr); + if (_push_down_agg_type == TPushAggOp::type::COUNT) { + auto rows = std::min(_current_group_reader->get_remaining_rows(), (int64_t)_batch_size); + + _current_group_reader->set_remaining_rows(_current_group_reader->get_remaining_rows() - + rows); + + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + + *read_rows = rows; + if (_current_group_reader->get_remaining_rows() == 0) { + _current_group_reader.reset(nullptr); + } + + return Status::OK(); + } + { SCOPED_RAW_TIMER(&_statistics.column_read_time); Status batch_st = diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 67a546f0791c7d..63f760abcd3300 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -119,8 +119,6 @@ class ParquetReader : public GenericReader { Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; - Status get_next_block(Block* block, size_t* read_rows, bool* eof, - TPushAggOp::type push_down_agg_type_opt) override; void close(); RowRange get_whole_range() { return _whole_range; } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 3ed14e9307f45b..7536dcb017b98b 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -249,7 +249,7 @@ Status NewOlapScanNode::_process_conjuncts() { } Status NewOlapScanNode::_build_key_ranges_and_filters() { - if (push_down_agg_type_opt == TPushAggOp::NONE) { + if (_push_down_agg_type == TPushAggOp::NONE) { const std::vector& column_names = _olap_scan_node.key_column_name; const std::vector& column_types = _olap_scan_node.key_column_type; DCHECK(column_types.size() == column_names.size()); @@ -326,7 +326,7 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() { } } else { _runtime_profile->add_info_string("PushDownAggregate", - push_down_agg_to_string(push_down_agg_type_opt)); + push_down_agg_to_string(_push_down_agg_type)); } if (_state->enable_profile()) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index cdba8f09a4bf9b..5610f4753983ed 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -266,7 +266,7 @@ Status NewOlapScanner::_init_tablet_reader_params( _aggregation = true; } else { _tablet_reader_params.direct_mode = _aggregation || single_version || - (_parent->push_down_agg_type_opt != TPushAggOp::NONE); + (_parent->get_push_down_agg_type() != TPushAggOp::NONE); } RETURN_IF_ERROR(_init_return_columns()); @@ -275,9 +275,7 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.tablet_schema = _tablet_schema; _tablet_reader_params.reader_type = ReaderType::READER_QUERY; _tablet_reader_params.aggregation = _aggregation; - if (_parent->push_down_agg_type_opt) { - _tablet_reader_params.push_down_agg_type_opt = _parent->push_down_agg_type_opt; - } + _tablet_reader_params.push_down_agg_type_opt = _parent->get_push_down_agg_type(); _tablet_reader_params.version = Version(0, _version); // TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 8b7e1d1f1b07d2..7d0f7fd33108c4 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -247,17 +247,6 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo SCOPED_TIMER(_get_block_timer); // Read next block. - - if (_parent->push_down_agg_type_opt != TPushAggOp::type ::NONE) { - //Prevent FE misjudging the "select count/min/max ..." statement - if (Status::OK() == _cur_reader->get_next_block(_src_block_ptr, &read_rows, - &_cur_reader_eof, - _parent->push_down_agg_type_opt)) { - _cur_reader.reset(nullptr); - _cur_reader_eof = true; - return Status::OK(); - } - } // Some of column in block may not be filled (column not exist in file) RETURN_IF_ERROR( _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); @@ -723,6 +712,7 @@ Status VFileScanner::_get_next_reader() { _name_to_col_type.clear(); _missing_cols.clear(); _cur_reader->get_columns(&_name_to_col_type, &_missing_cols); + _cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type()); RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 52221ab142f757..28d47da52cc4f5 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -113,7 +113,15 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } else { _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; } - push_down_agg_type_opt = tnode.push_down_agg_type_opt; + + if (tnode.__isset.push_down_agg_type_opt) { + _push_down_agg_type = tnode.push_down_agg_type_opt; + } else if (tnode.olap_scan_node.__isset.push_down_agg_type_opt) { + _push_down_agg_type = tnode.olap_scan_node.push_down_agg_type_opt; + + } else { + _push_down_agg_type = TPushAggOp::type::NONE; + } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 8b3ac347a0f993..822ae8c1d7d3c3 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -135,6 +135,7 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { } } + TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } // Get next block. // If eos is true, no more data will be read and block should be empty. Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; @@ -351,8 +352,7 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { std::unordered_map _colname_to_slot_id; std::vector _col_distribute_ids; -public: - TPushAggOp::type push_down_agg_type_opt; + TPushAggOp::type _push_down_agg_type; private: Status _normalize_conjuncts(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 65b2a88043ee5a..3912315f4aec2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -43,8 +43,6 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; import org.apache.doris.nereids.trees.expressions.functions.agg.Count; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupConcat; -import org.apache.doris.nereids.trees.expressions.functions.agg.Max; -import org.apache.doris.nereids.trees.expressions.functions.agg.Min; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctSum; import org.apache.doris.nereids.trees.expressions.functions.agg.Sum; @@ -60,6 +58,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; @@ -125,13 +124,13 @@ public List buildRules() { logicalFileScan() ) ) - .when(agg -> agg.isNormalized() && enablePushDownNoGroupAgg()) - .thenApply(ctx -> { - LogicalAggregate> agg = ctx.root; - LogicalProject project = agg.child(); - LogicalFileScan fileScan = project.child(); - return storageLayerAggregate(agg, project, fileScan, ctx.cascadesContext); - }) + .when(agg -> agg.isNormalized() && enablePushDownNoGroupAgg()) + .thenApply(ctx -> { + LogicalAggregate> agg = ctx.root; + LogicalProject project = agg.child(); + LogicalFileScan fileScan = project.child(); + return storageLayerAggregate(agg, project, fileScan, ctx.cascadesContext); + }) ), RuleType.ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT.build( basePattern @@ -208,14 +207,19 @@ && couldConvertToMulti(agg)) private LogicalAggregate storageLayerAggregate( LogicalAggregate aggregate, @Nullable LogicalProject project, - LogicalOlapScan olapScan, CascadesContext cascadesContext) { + LogicalRelation logicalScan, CascadesContext cascadesContext) { final LogicalAggregate canNotPush = aggregate; - KeysType keysType = olapScan.getTable().getKeysType(); - if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) { + if (!(logicalScan instanceof LogicalOlapScan) && !(logicalScan instanceof LogicalFileScan)) { return canNotPush; } + if (logicalScan instanceof LogicalOlapScan) { + KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType(); + if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) { + return canNotPush; + } + } List groupByExpressions = aggregate.getGroupByExpressions(); if (!groupByExpressions.isEmpty() || !aggregate.getDistinctArguments().isEmpty()) { return canNotPush; @@ -231,8 +235,11 @@ private LogicalAggregate storageLayerAggregate( if (!supportedAgg.keySet().containsAll(functionClasses)) { return canNotPush; } - if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) { - return canNotPush; + if (logicalScan instanceof LogicalOlapScan) { + KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType(); + if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) { + return canNotPush; + } } if (aggregateFunctions.stream().anyMatch(fun -> fun.arity() > 1)) { return canNotPush; @@ -299,12 +306,15 @@ private LogicalAggregate storageLayerAggregate( ExpressionUtils.collect(argumentsOfAggregateFunction, SlotReference.class::isInstance); List usedSlotInTable = (List) (List) Project.findProject(aggUsedSlots, - (List) (List) olapScan.getOutput()); + (List) (List) logicalScan.getOutput()); for (SlotReference slot : usedSlotInTable) { Column column = slot.getColumn().get(); - if (keysType == KeysType.AGG_KEYS && !column.isKey()) { - return canNotPush; + if (logicalScan instanceof LogicalOlapScan) { + KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType(); + if (keysType == KeysType.AGG_KEYS && !column.isKey()) { + return canNotPush; + } } // The zone map max length of CharFamily is 512, do not // over the length: https://github.com/apache/doris/pull/6293 @@ -328,148 +338,41 @@ private LogicalAggregate storageLayerAggregate( } } - PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan() - .build() - .transform(olapScan, cascadesContext) - .get(0); - if (project != null) { - return aggregate.withChildren(ImmutableList.of( - project.withChildren( - ImmutableList.of(new PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp))) - )); - } else { - return aggregate.withChildren(ImmutableList.of( - new PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp) - )); - } - } - /** - * sql: select count(*) from tbl - *

- * before: - *

- * LogicalAggregate(groupBy=[], output=[count(*)]) - * | - * LogicalFileScan(table=tbl) - *

- * after: - *

- * LogicalAggregate(groupBy=[], output=[count(*)]) - * | - * PhysicalStorageLayerAggregate(pushAggOp=COUNT, table=PhysicalFileScan(table=tbl)) - * - */ - - private LogicalAggregate storageLayerAggregate( - LogicalAggregate aggregate, - @Nullable LogicalProject project, - LogicalFileScan fileScan, CascadesContext cascadesContext) { - final LogicalAggregate canNotPush = aggregate; - List groupByExpressions = aggregate.getGroupByExpressions(); - if (!groupByExpressions.isEmpty() || !aggregate.getDistinctArguments().isEmpty()) { - return canNotPush; - } - - Set aggregateFunctions = aggregate.getAggregateFunctions(); - Set> functionClasses = aggregateFunctions - .stream() - .map(AggregateFunction::getClass) - .collect(Collectors.toSet()); - - Map, PushDownAggOp> supportedAgg = PushDownAggOp.supportedFunctions(); - if (!supportedAgg.keySet().containsAll(functionClasses)) { - return canNotPush; - } - - if (functionClasses.contains(Min.class) || functionClasses.contains(Max.class)) { - return canNotPush; - } - - if (aggregateFunctions.stream().anyMatch(fun -> fun.arity() > 1)) { - return canNotPush; - } - - // TODO: refactor this to process slot reference or expression together - boolean onlyContainsSlotOrNumericCastSlot = aggregateFunctions.stream() - .map(ExpressionTrait::getArguments) - .flatMap(List::stream) - .allMatch(argument -> { - if (argument instanceof SlotReference) { - return true; - } - if (argument instanceof Cast) { - return argument.child(0) instanceof SlotReference - && argument.getDataType().isNumericType() - && argument.child(0).getDataType().isNumericType(); - } - return false; - }); - if (!onlyContainsSlotOrNumericCastSlot) { - return canNotPush; - } - - // we already normalize the arguments to slotReference - List argumentsOfAggregateFunction = aggregateFunctions.stream() - .flatMap(aggregateFunction -> aggregateFunction.getArguments().stream()) - .collect(ImmutableList.toImmutableList()); - - if (project != null) { - argumentsOfAggregateFunction = Project.findProject( - (List) (List) argumentsOfAggregateFunction, project.getProjects()) - .stream() - .map(p -> p instanceof Alias ? p.child(0) : p) - .collect(ImmutableList.toImmutableList()); - } - - onlyContainsSlotOrNumericCastSlot = argumentsOfAggregateFunction - .stream() - .allMatch(argument -> { - if (argument instanceof SlotReference) { - return true; - } - if (argument instanceof Cast) { - return argument.child(0) instanceof SlotReference - && argument.getDataType().isNumericType() - && argument.child(0).getDataType().isNumericType(); - } - return false; - }); - if (!onlyContainsSlotOrNumericCastSlot) { - return canNotPush; - } - - Set pushDownAggOps = functionClasses.stream() - .map(supportedAgg::get) - .collect(Collectors.toSet()); - - PushDownAggOp aggOp = pushDownAggOps.iterator().next(); - - Set aggUsedSlots = - ExpressionUtils.collect(argumentsOfAggregateFunction, SlotReference.class::isInstance); + if (logicalScan instanceof LogicalOlapScan) { + PhysicalOlapScan physicalScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan() + .build() + .transform((LogicalOlapScan) logicalScan, cascadesContext) + .get(0); - List usedSlotInTable = (List) (List) Project.findProject(aggUsedSlots, - (List) (List) fileScan.getOutput()); + if (project != null) { + return aggregate.withChildren(ImmutableList.of( + project.withChildren( + ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp))) + )); + } else { + return aggregate.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate(physicalScan, mergeOp) + )); + } - for (SlotReference slot : usedSlotInTable) { - Column column = slot.getColumn().get(); - if (column.isAllowNull()) { - return canNotPush; + } else if (logicalScan instanceof LogicalFileScan) { + PhysicalFileScan physicalScan = (PhysicalFileScan) new LogicalFileScanToPhysicalFileScan() + .build() + .transform((LogicalFileScan) logicalScan, cascadesContext) + .get(0); + if (project != null) { + return aggregate.withChildren(ImmutableList.of( + project.withChildren( + ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp))) + )); + } else { + return aggregate.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate(physicalScan, mergeOp) + )); } - } - PhysicalFileScan physicalfileScan = (PhysicalFileScan) new LogicalFileScanToPhysicalFileScan() - .build() - .transform(fileScan, cascadesContext) - .get(0); - if (project != null) { - return aggregate.withChildren(ImmutableList.of( - project.withChildren( - ImmutableList.of(new PhysicalStorageLayerAggregate(physicalfileScan, aggOp))) - )); } else { - return aggregate.withChildren(ImmutableList.of( - new PhysicalStorageLayerAggregate(physicalfileScan, aggOp) - )); + return canNotPush; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 265c8972e43cfd..8ab7fcf243aca3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -78,7 +78,6 @@ import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TPrimitiveType; -import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; @@ -101,6 +100,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -355,7 +355,8 @@ public void ignoreConjuncts(Expr whereExpr) { return; } Expr vconjunct = convertConjunctsToAndCompoundPredicate(conjuncts).replaceSubPredicate(whereExpr); - conjuncts = splitAndCompoundPredicateToConjuncts(vconjunct).stream().collect(Collectors.toList()); + conjuncts = splitAndCompoundPredicateToConjuncts(vconjunct).stream().filter(Objects::nonNull) + .collect(Collectors.toList()); } /** @@ -1360,9 +1361,10 @@ protected void toThrift(TPlanNode msg) { msg.olap_scan_node.setTableName(olapTable.getName()); msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite()); - if (pushDownAggNoGroupingOp != TPushAggOp.NONE) { - msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); - } + msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + + msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + // In TOlapScanNode , pushDownAggNoGroupingOp field is deprecated. if (outputColumnUniqueIds != null) { msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java index 5e299acc1dcf96..38be399244c109 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -37,7 +37,6 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Preconditions; @@ -77,9 +76,8 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, St @Override protected void toThrift(TPlanNode planNode) { - if (pushDownAggNoGroupingOp != TPushAggOp.NONE) { - planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); - } + planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + planNode.setNodeType(TPlanNodeType.FILE_SCAN_NODE); TFileScanNode fileScanNode = new TFileScanNode(); fileScanNode.setTupleId(desc.getId().asInt()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index a745be8dee53c8..de034e54941ebf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -68,8 +68,13 @@ public class HiveScanNode extends FileQueryScanNode { public static final String PROP_FIELD_DELIMITER = "field.delim"; public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01" + public static final String PROP_LINE_DELIMITER = "line.delim"; public static final String DEFAULT_LINE_DELIMITER = "\n"; + public static final String PROP_ARRAY_DELIMITER_HIVE2 = "colelction.delim"; + public static final String PROP_ARRAY_DELIMITER_HIVE3 = "collection.delim"; + public static final String DEFAULT_ARRAY_DELIMITER = "\2"; + protected final HMSExternalTable hmsTable; private HiveTransaction hiveTransaction = null; @@ -99,14 +104,13 @@ protected void doInitialize() throws UserException { String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); if (inputFormat.contains("TextInputFormat")) { for (SlotDescriptor slot : desc.getSlots()) { - if (!slot.getType().isScalarType()) { + if (slot.getType().isMapType() || slot.getType().isStructType()) { throw new UserException("For column `" + slot.getColumn().getName() - + "`, The column types ARRAY/MAP/STRUCT are not supported yet" - + " for text input format of Hive. "); + + "`, The column types MAP/STRUCT are not supported yet" + + " for text input format of Hive. "); } } } - if (hmsTable.isHiveTransactionalTable()) { this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()), ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable()); @@ -146,12 +150,7 @@ protected List getPartitions() throws AnalysisException { ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id); partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); } - List allPartitions = - cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList); - if (ConnectContext.get().getExecutor() != null) { - ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionsFinishTime(); - } - return allPartitions; + return cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList); } else { // unpartitioned table, create a dummy partition to save location and inputformat, // so that we can unify the interface. @@ -160,9 +159,6 @@ protected List getPartitions() throws AnalysisException { hmsTable.getRemoteTable().getSd().getLocation(), null); this.totalPartitionNum = 1; this.readPartitionNum = 1; - if (ConnectContext.get().getExecutor() != null) { - ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionsFinishTime(); - } return Lists.newArrayList(dummyPartition); } } @@ -195,9 +191,6 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List getLocationProperties() throws UserException { @Override protected TFileAttributes getFileAttributes() throws UserException { TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() - .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); - textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER); + java.util.Map delimiter = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); + textParams.setColumnSeparator(delimiter.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER)); + textParams.setLineDelimiter(delimiter.getOrDefault(PROP_LINE_DELIMITER, DEFAULT_LINE_DELIMITER)); + if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE2) != null) { + textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE2)); + } else if (delimiter.get(PROP_ARRAY_DELIMITER_HIVE3) != null) { + textParams.setArrayDelimiter(delimiter.get(PROP_ARRAY_DELIMITER_HIVE3)); + } else { + textParams.setArrayDelimiter(DEFAULT_ARRAY_DELIMITER); + } TFileAttributes fileAttributes = new TFileAttributes(); fileAttributes.setTextParams(textParams); fileAttributes.setHeaderType(""); @@ -302,15 +302,9 @@ private void genSlotToSchemaIdMap() { @Override public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { - TFileFormatType fileFormatType; - try { - fileFormatType = getFileFormatType(); - } catch (UserException e) { - throw new RuntimeException(e); - } String aggFunctionName = aggExpr.getFnName().getFunction(); - if (aggFunctionName.equalsIgnoreCase("COUNT") && fileFormatType == TFileFormatType.FORMAT_PARQUET) { + if (aggFunctionName.equalsIgnoreCase("COUNT")) { return true; } return false; @@ -318,9 +312,6 @@ public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { @Override public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { - if (col.isAllowNull()) { - return false; - } - return true; + return !col.isAllowNull(); } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index bdbc077e0b0803..9318b593a397cb 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -636,11 +636,12 @@ struct TOlapScanNode { // It's limit for scanner instead of scanNode so we add a new limit. 10: optional i64 sort_limit 11: optional bool enable_unique_key_merge_on_write - 12: optional bool use_topn_opt - 13: optional list indexes_desc - 14: optional set output_column_unique_ids - 15: optional list distribute_column_ids - 16: optional i32 schema_version + 12: optional TPushAggOp push_down_agg_type_opt //Deprecated + 13: optional bool use_topn_opt + 14: optional list indexes_desc + 15: optional set output_column_unique_ids + 16: optional list distribute_column_ids + 17: optional i32 schema_version } struct TEqJoinCondition { From 686fabc7ab7da85795bbb9283c407dd08c4026d3 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Tue, 25 Jul 2023 01:24:24 +0800 Subject: [PATCH 04/10] [opt](hive)opt select count(*) stmt push down agg on hive --- .../java/org/apache/doris/planner/OlapScanNode.java | 4 +--- .../apache/doris/planner/external/HiveScanNode.java | 13 ++++++++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 8ab7fcf243aca3..7b3e95c3a0b544 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -100,7 +100,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -355,8 +354,7 @@ public void ignoreConjuncts(Expr whereExpr) { return; } Expr vconjunct = convertConjunctsToAndCompoundPredicate(conjuncts).replaceSubPredicate(whereExpr); - conjuncts = splitAndCompoundPredicateToConjuncts(vconjunct).stream().filter(Objects::nonNull) - .collect(Collectors.toList()); + conjuncts = splitAndCompoundPredicateToConjuncts(vconjunct).stream().collect(Collectors.toList()); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index de034e54941ebf..f43a6b1dce95e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -150,7 +150,12 @@ protected List getPartitions() throws AnalysisException { ListPartitionItem listPartitionItem = (ListPartitionItem) idToPartitionItem.get(id); partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList()); } - return cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList); + List allPartitions = + cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(), partitionValuesList); + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionsFinishTime(); + } + return allPartitions; } else { // unpartitioned table, create a dummy partition to save location and inputformat, // so that we can unify the interface. @@ -159,6 +164,9 @@ protected List getPartitions() throws AnalysisException { hmsTable.getRemoteTable().getSd().getLocation(), null); this.totalPartitionNum = 1; this.readPartitionNum = 1; + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionsFinishTime(); + } return Lists.newArrayList(dummyPartition); } } @@ -191,6 +199,9 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List Date: Wed, 26 Jul 2023 00:41:00 +0800 Subject: [PATCH 05/10] [opt](hive)opt select count(*) stmt push down agg on hive --- be/src/vec/exec/format/orc/vorc_reader.cpp | 1 - be/src/vec/exec/scan/vscan_node.cpp | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 5ca61f5d0ded85..0de2eb6c9b8fbb 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1385,7 +1385,6 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { if (get_remaining_rows() == 0) { *eof = true; } - *read_rows = rows; return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 28d47da52cc4f5..8381603cfacfe3 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -114,6 +114,10 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; } + // tnode.olap_scan_node.push_down_agg_type_opt field is deprecated + // Introduced a new field : tnode.push_down_agg_type_opt + // + // make it compatible here if (tnode.__isset.push_down_agg_type_opt) { _push_down_agg_type = tnode.push_down_agg_type_opt; } else if (tnode.olap_scan_node.__isset.push_down_agg_type_opt) { From ca0f80fee28a005fb93a23ef874a8ceb3f5a4e9a Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 26 Jul 2023 11:22:35 +0800 Subject: [PATCH 06/10] [opt](hive)opt select count(*) stmt push down agg on hive --- be/src/vec/exec/scan/vscan_node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 8381603cfacfe3..679edf5370676d 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -115,7 +115,7 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } // tnode.olap_scan_node.push_down_agg_type_opt field is deprecated - // Introduced a new field : tnode.push_down_agg_type_opt + // Introduced a new field : tnode.push_down_agg_type_opt // // make it compatible here if (tnode.__isset.push_down_agg_type_opt) { From 886440d28b1cfb6cf1cbba0b2c4b6b587b37e362 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 26 Jul 2023 11:27:45 +0800 Subject: [PATCH 07/10] [opt](hive)opt select count(*) stmt push down agg on parquet in hive . --- be/src/vec/exec/scan/new_olap_scanner.cpp | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 5610f4753983ed..6299053ec7d48a 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -247,21 +247,9 @@ Status NewOlapScanner::_init_tablet_reader_params( const FilterPredicates& filter_predicates, const std::vector& function_filters) { // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty - bool single_version = - (_tablet_reader_params.rs_readers.size() == 1 && - _tablet_reader_params.rs_readers[0]->rowset()->start_version() == 0 && - !_tablet_reader_params.rs_readers[0] - ->rowset() - ->rowset_meta() - ->is_segments_overlapping()) || - (_tablet_reader_params.rs_readers.size() == 2 && - _tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 && - _tablet_reader_params.rs_readers[1]->rowset()->start_version() == 2 && - !_tablet_reader_params.rs_readers[1] - ->rowset() - ->rowset_meta() - ->is_segments_overlapping()); - if (_state->skip_storage_engine_merge()) { + const bool single_version = _tablet_reader_params.has_single_version(); + + if (_state->skip_storage_engine_merge()) { _tablet_reader_params.direct_mode = true; _aggregation = true; } else { From 83fc5dc7ea242e7b662c4b48e728e1d1379cc030 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 26 Jul 2023 14:56:57 +0800 Subject: [PATCH 08/10] [opt](hive)opt select count(*) stmt push down agg on parquet in hive . --- be/src/vec/exec/scan/new_olap_scanner.cpp | 2 +- regression-test/conf/regression-conf.groovy | 10 +- .../hive/test_select_count_optimize.out | 157 ++++++++++++++++++ .../hive/test_select_count_optimize.groovy | 91 ++++++++++ 4 files changed, 254 insertions(+), 6 deletions(-) create mode 100644 regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out create mode 100644 regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 6299053ec7d48a..51864acb24d869 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -249,7 +249,7 @@ Status NewOlapScanner::_init_tablet_reader_params( // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty const bool single_version = _tablet_reader_params.has_single_version(); - if (_state->skip_storage_engine_merge()) { + if (_state->skip_storage_engine_merge()) { _tablet_reader_params.direct_mode = true; _aggregation = true; } else { diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index fe06831e594813..bee7520001aaca 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -24,8 +24,8 @@ defaultDb = "regression_test" // init cmd like: select @@session.tx_read_only // at each time we connect. // add allowLoadLocalInfile so that the jdbc can execute mysql load data from client. -jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" -targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" +jdbcUrl = "jdbc:mysql://127.0.0.1:55557/?useLocalSessionState=true&allowLoadLocalInfile=true" +targetJdbcUrl = "jdbc:mysql://127.0.0.1:55557/?useLocalSessionState=true&allowLoadLocalInfile=true" jdbcUser = "root" jdbcPassword = "" @@ -34,7 +34,7 @@ feTargetThriftAddress = "127.0.0.1:9020" feSyncerUser = "root" feSyncerPassword = "" -feHttpAddress = "127.0.0.1:8030" +feHttpAddress = "127.0.0.1:55555" feHttpUser = "root" feHttpPassword = "" @@ -101,8 +101,8 @@ es_8_port=39200 //hive catalog test config for bigdata -enableExternalHiveTest = false -extHiveHmsHost = "***.**.**.**" +enableExternalHiveTest = true +extHiveHmsHost = "172.21.16.47" extHiveHmsPort = 7004 extHdfsPort = 4007 extHiveHmsUser = "****" diff --git a/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out b/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out new file mode 100644 index 00000000000000..9f76685f652d96 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_select_count_optimize.out @@ -0,0 +1,157 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 ALGERIA 0 haggle. carefully final deposits detect slyly agai +1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon +2 BRAZIL 1 y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special +3 CANADA 1 eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold +18 CHINA 2 c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos +4 EGYPT 4 y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d +5 ETHIOPIA 0 ven packages wake quickly. regu +6 FRANCE 3 refully final requests. regular, ironi +7 GERMANY 3 l platelets. regular accounts x-ray: unusual, regular acco +8 INDIA 2 ss excuses cajole slyly across the packages. deposits print aroun +9 INDONESIA 2 slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull +10 IRAN 4 efully alongside of the slyly final dependencies. +11 IRAQ 4 nic deposits boost atop the quickly final requests? quickly regula +12 JAPAN 2 ously. final, express gifts cajole a +13 JORDAN 4 ic deposits are blithely about the carefully regular pa +14 KENYA 0 pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t +15 MOROCCO 0 rns. blithely bold courts among the closely regular packages use furiously bold platelets? +16 MOZAMBIQUE 0 s. ironic, unusual asymptotes wake blithely r +17 PERU 1 platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun +19 ROMANIA 3 ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account +22 RUSSIA 3 requests against the platelets use never according to the quickly regular pint +20 SAUDI ARABIA 4 ts. silent requests haggle. closely express packages sleep across the blithely +23 UNITED KINGDOM 3 eans boost carefully special requests. accounts are. carefull +24 UNITED STATES 1 y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be +21 VIETNAM 2 hely enticingly express accounts. even, final + +-- !sql -- +25 + +-- !sql -- +25 + +-- !sql -- +0 + +-- !sql -- +5 + +-- !sql -- +4 + +-- !sql -- +0 + +-- !sql -- +5 +5 +5 +5 +5 + +-- !sql -- +5999989709 + +-- !sql -- +200000000 + +-- !sql -- +200000000 + +-- !sql -- +3995860 +3996114 +3997119 +3997177 +3997193 +3997623 +3998060 +3998197 +3998199 +3998205 +3998246 +3998259 +3998308 +3998860 +3998903 +3999137 +3999286 +3999411 +3999441 +3999477 +3999643 +3999670 +3999687 +3999830 +4000095 +4000151 +4000164 +4000268 +4000572 +4000594 +4000664 +4000672 +4000711 +4001091 +4001127 +4001273 +4001351 +4001463 +4001520 +4001568 +4001718 +4001940 +4001942 +4002064 +4002067 +4002305 +4002815 +4002966 +4003245 +4003749 + +-- !sql -- +3999286 + +-- !sql -- +210000000 + +-- !sql -- +1 + +-- !sql -- +200000000 + +-- !sql -- +200000000 + +-- !sql -- +3995860 +3996114 +3997119 + +-- !sql -- +210000000 + +-- !sql -- +ALGERIA +ARGENTINA + +-- !sql -- +25 + +-- !sql -- +0 + +-- !sql -- +1 + +-- !sql -- +5 +5 +5 +5 +5 + diff --git a/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy b/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy new file mode 100644 index 00000000000000..2a95dc4294c732 --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_select_count_optimize.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_select_count_optimize", "p2") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_select_count_optimize" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hadoop.username' = 'hadoop', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ set query_timeout=3600; """ + + //parquet + qt_sql """ select * from tpch_1000_parquet.nation order by n_name,n_regionkey,n_nationkey,n_comment ; """ + + qt_sql """ select count(*) from tpch_1000_parquet.nation; """ + + qt_sql """ select count(1024) from tpch_1000_parquet.nation; """ + + qt_sql """ select count(null) from tpch_1000_parquet.nation; """ + + + qt_sql """ select count(*) from tpch_1000_parquet.nation where n_regionkey = 0; """ + + qt_sql """ select max(n_regionkey) from tpch_1000_parquet.nation ;""" + + qt_sql """ select min(n_regionkey) from tpch_1000_parquet.nation ; """ + + qt_sql """ select count(*) as a from tpch_1000_parquet.nation group by n_regionkey order by a; """ + + qt_sql """ select count(*) from tpch_1000_parquet.lineitem; """ + + qt_sql """ select count(*) from tpch_1000_parquet.part; """ + + qt_sql """ select count(p_partkey) from tpch_1000_parquet.part; """ + + qt_sql """ select count(*) as sz from tpch_1000_parquet.part group by p_size order by sz ;""" + + qt_sql """ select count(*) from tpch_1000_parquet.part where p_size = 1; """ + + qt_sql """ select count(*) from user_profile.hive_hll_user_profile_wide_table_parquet; """; + + //orc + qt_sql """ select count(*) from tpch_1000_orc.part where p_partkey=1; """ + + qt_sql """ select max(p_partkey) from tpch_1000_orc.part ; """ + + qt_sql """ select count(p_comment) from tpch_1000_orc.part; """ + + qt_sql """ select count(*) as a from tpch_1000_orc.part group by p_size order by a limit 3 ; """ + + qt_sql """ select count(*) from user_profile.hive_hll_user_profile_wide_table_orc ; """ ; + + //other + qt_sql """ select n_name from tpch_1000.nation order by n_name limit 2; """ + + qt_sql """ select count(*) from tpch_1000.nation; """ + + qt_sql """ select min(n_regionkey) from tpch_1000.nation ; """ + + qt_sql """ select count(*) from tpch_1000.nation where n_nationkey=5;""" + + qt_sql """ select count(*) as a from tpch_1000.nation group by n_regionkey order by a;""" + + } +} + From 28317dc1c8d71edf1cae467d7a6d2e70aef0027d Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Wed, 26 Jul 2023 15:11:27 +0800 Subject: [PATCH 09/10] [opt](hive)opt select count(*) stmt push down agg on parquet in hive . --- regression-test/conf/regression-conf.groovy | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index bee7520001aaca..fe06831e594813 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -24,8 +24,8 @@ defaultDb = "regression_test" // init cmd like: select @@session.tx_read_only // at each time we connect. // add allowLoadLocalInfile so that the jdbc can execute mysql load data from client. -jdbcUrl = "jdbc:mysql://127.0.0.1:55557/?useLocalSessionState=true&allowLoadLocalInfile=true" -targetJdbcUrl = "jdbc:mysql://127.0.0.1:55557/?useLocalSessionState=true&allowLoadLocalInfile=true" +jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" +targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" jdbcUser = "root" jdbcPassword = "" @@ -34,7 +34,7 @@ feTargetThriftAddress = "127.0.0.1:9020" feSyncerUser = "root" feSyncerPassword = "" -feHttpAddress = "127.0.0.1:55555" +feHttpAddress = "127.0.0.1:8030" feHttpUser = "root" feHttpPassword = "" @@ -101,8 +101,8 @@ es_8_port=39200 //hive catalog test config for bigdata -enableExternalHiveTest = true -extHiveHmsHost = "172.21.16.47" +enableExternalHiveTest = false +extHiveHmsHost = "***.**.**.**" extHiveHmsPort = 7004 extHdfsPort = 4007 extHiveHmsUser = "****" From 0d9e4bf49442848ffacda562f785ee0da9e68db0 Mon Sep 17 00:00:00 2001 From: changyuwei <2017501503@qq.com> Date: Fri, 28 Jul 2023 11:10:40 +0800 Subject: [PATCH 10/10] [opt](hive)opt select count(*) stmt push down agg on parquet in hive . --- be/src/vec/exec/format/generic_reader.h | 2 ++ regression-test/data/performance_p0/redundant_conjuncts.out | 2 ++ 2 files changed, 4 insertions(+) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index fedd476f43c05b..a712fb2847467e 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "common/factory_creator.h" #include "common/status.h" #include "runtime/types.h" diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out b/regression-test/data/performance_p0/redundant_conjuncts.out index 3baa5b3d930b5e..f82e0c94536911 100644 --- a/regression-test/data/performance_p0/redundant_conjuncts.out +++ b/regression-test/data/performance_p0/redundant_conjuncts.out @@ -12,6 +12,7 @@ PLAN FRAGMENT 0 PREDICATES: `k1` = 1 partitions=0/1, tablets=0/0, tabletList= cardinality=0, avgRowSize=8.0, numNodes=1 + pushAggOp=NONE -- !redundant_conjuncts_gnerated_by_extract_common_filter -- PLAN FRAGMENT 0 @@ -26,4 +27,5 @@ PLAN FRAGMENT 0 PREDICATES: `k1` = 1 OR `k1` = 2 partitions=0/1, tablets=0/0, tabletList= cardinality=0, avgRowSize=8.0, numNodes=1 + pushAggOp=NONE