Skip to content

Commit

Permalink
[opt](hive)opt select count(*) stmt push down agg on parquet in hive . (
Browse files Browse the repository at this point in the history
apache#22115)

Optimization "select count(*) from table" stmtement , push down "count" type to BE.
support file type : parquet ,orc in hive .

1. 4kfiles , 60kwline num
    before:  1 min 37.70 sec
    after:   50.18 sec

2. 50files , 60kwline num
    before: 1.12 sec
    after: 0.82 sec
  • Loading branch information
hubgeter authored and xiaokang committed Aug 8, 2023
1 parent f502df2 commit e6befff
Show file tree
Hide file tree
Showing 21 changed files with 471 additions and 63 deletions.
8 changes: 8 additions & 0 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <gen_cpp/PlanNodes_types.h>

#include "common/factory_creator.h"
#include "common/status.h"
#include "runtime/types.h"
Expand All @@ -30,7 +32,12 @@ class Block;
// a set of blocks with specified schema,
class GenericReader {
public:
GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}
void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
_push_down_agg_type = push_down_agg_type;
}
virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;

virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() {
std::unordered_map<std::string, TypeDescriptor> map;
return map;
Expand Down Expand Up @@ -67,6 +74,7 @@ class GenericReader {

/// Whether the underlying FileReader has filled the partition&missing columns
bool _fill_all_columns = false;
TPushAggOp::type _push_down_agg_type;
};

} // namespace doris::vectorized
18 changes: 18 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ Status OrcReader::_create_file_reader() {
} catch (std::exception& e) {
return Status::InternalError("Init OrcReader failed. reason = {}", e.what());
}
_remaining_rows = _reader->getNumberOfRows();

return Status::OK();
}

Expand Down Expand Up @@ -1373,6 +1375,22 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int
}

Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);

set_remaining_rows(get_remaining_rows() - rows);

for (auto& col : block->mutate_columns()) {
col->resize(rows);
}

*read_rows = rows;
if (get_remaining_rows() == 0) {
*eof = true;
}
return Status::OK();
}

if (_lazy_read_ctx.can_lazy_read) {
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,11 @@ class OrcReader : public GenericReader {
const NullMap* null_map,
orc::ColumnVectorBatch* cvb,
const orc::Type* orc_column_typ);
int64_t get_remaining_rows() { return _remaining_rows; }
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }

private:
int64_t _remaining_rows = 0;
RuntimeProfile* _profile = nullptr;
RuntimeState* _state = nullptr;
const TFileScanRangeParams& _scan_params;
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class RowGroupReader {
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; }

ParquetColumnReader::Statistics statistics();
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
int64_t get_remaining_rows() { return _remaining_rows; }

private:
void _merge_read_ranges(std::vector<RowRange>& row_ranges);
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,24 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
}
}
DCHECK(_current_group_reader != nullptr);
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
auto rows = std::min(_current_group_reader->get_remaining_rows(), (int64_t)_batch_size);

_current_group_reader->set_remaining_rows(_current_group_reader->get_remaining_rows() -
rows);

for (auto& col : block->mutate_columns()) {
col->resize(rows);
}

*read_rows = rows;
if (_current_group_reader->get_remaining_rows() == 0) {
_current_group_reader.reset(nullptr);
}

return Status::OK();
}

{
SCOPED_RAW_TIMER(&_statistics.column_read_time);
Status batch_st =
Expand Down
8 changes: 3 additions & 5 deletions be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ Status NewOlapScanNode::_process_conjuncts() {
}

Status NewOlapScanNode::_build_key_ranges_and_filters() {
if (!_olap_scan_node.__isset.push_down_agg_type_opt ||
_olap_scan_node.push_down_agg_type_opt == TPushAggOp::NONE) {
if (_push_down_agg_type == TPushAggOp::NONE) {
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
DCHECK(column_types.size() == column_names.size());
Expand Down Expand Up @@ -326,9 +325,8 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() {
range);
}
} else {
_runtime_profile->add_info_string(
"PushDownAggregate",
push_down_agg_to_string(_olap_scan_node.push_down_agg_type_opt));
_runtime_profile->add_info_string("PushDownAggregate",
push_down_agg_to_string(_push_down_agg_type));
}

if (_state->enable_profile()) {
Expand Down
12 changes: 4 additions & 8 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,13 @@ Status NewOlapScanner::_init_tablet_reader_params(
const std::vector<FunctionFilter>& function_filters) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
const bool single_version = _tablet_reader_params.has_single_version();
auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent);

if (_state->skip_storage_engine_merge()) {
_tablet_reader_params.direct_mode = true;
_aggregation = true;
} else {
_tablet_reader_params.direct_mode =
_aggregation || single_version ||
real_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
_tablet_reader_params.direct_mode = _aggregation || single_version ||
(_parent->get_push_down_agg_type() != TPushAggOp::NONE);
}

RETURN_IF_ERROR(_init_return_columns());
Expand All @@ -264,10 +263,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.tablet_schema = _tablet_schema;
_tablet_reader_params.reader_type = ReaderType::READER_QUERY;
_tablet_reader_params.aggregation = _aggregation;
if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt) {
_tablet_reader_params.push_down_agg_type_opt =
real_parent->_olap_scan_node.push_down_agg_type_opt;
}
_tablet_reader_params.push_down_agg_type_opt = _parent->get_push_down_agg_type();
_tablet_reader_params.version = Version(0, _version);

// TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "vec/core/block.h"
#include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exec/scan/vscanner.h"
#include "vfile_scanner.h"

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo
RETURN_IF_ERROR(_init_src_block(block));
{
SCOPED_TIMER(_get_block_timer);

// Read next block.
// Some of column in block may not be filled (column not exist in file)
RETURN_IF_ERROR(
Expand Down Expand Up @@ -737,6 +738,7 @@ Status VFileScanner::_get_next_reader() {
_name_to_col_type.clear();
_missing_cols.clear();
_cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
_cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
Expand Down
13 changes: 13 additions & 0 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
} else {
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
}

// tnode.olap_scan_node.push_down_agg_type_opt field is deprecated
// Introduced a new field : tnode.push_down_agg_type_opt
//
// make it compatible here
if (tnode.__isset.push_down_agg_type_opt) {
_push_down_agg_type = tnode.push_down_agg_type_opt;
} else if (tnode.olap_scan_node.__isset.push_down_agg_type_opt) {
_push_down_agg_type = tnode.olap_scan_node.push_down_agg_type_opt;

} else {
_push_down_agg_type = TPushAggOp::type::NONE;
}
return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {
}
}

TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
// Get next block.
// If eos is true, no more data will be read and block should be empty.
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
Expand Down Expand Up @@ -351,6 +352,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {
std::unordered_map<std::string, int> _colname_to_slot_id;
std::vector<int> _col_distribute_ids;

TPushAggOp::type _push_down_agg_type;

private:
Status _normalize_conjuncts();
Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
Expand Down Expand Up @@ -115,6 +118,20 @@ public List<Rule> buildRules() {
return storageLayerAggregate(agg, project, olapScan, ctx.cascadesContext);
})
),
RuleType.STORAGE_LAYER_AGGREGATE_WITH_PROJECT.build(
logicalAggregate(
logicalProject(
logicalFileScan()
)
)
.when(agg -> agg.isNormalized() && enablePushDownNoGroupAgg())
.thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalFileScan>> agg = ctx.root;
LogicalProject<LogicalFileScan> project = agg.child();
LogicalFileScan fileScan = project.child();
return storageLayerAggregate(agg, project, fileScan, ctx.cascadesContext);
})
),
RuleType.ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT.build(
basePattern
.when(agg -> agg.getDistinctArguments().size() == 0)
Expand Down Expand Up @@ -190,14 +207,19 @@ && couldConvertToMulti(agg))
private LogicalAggregate<? extends Plan> storageLayerAggregate(
LogicalAggregate<? extends Plan> aggregate,
@Nullable LogicalProject<? extends Plan> project,
LogicalOlapScan olapScan, CascadesContext cascadesContext) {
LogicalRelation logicalScan, CascadesContext cascadesContext) {
final LogicalAggregate<? extends Plan> canNotPush = aggregate;

KeysType keysType = olapScan.getTable().getKeysType();
if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) {
if (!(logicalScan instanceof LogicalOlapScan) && !(logicalScan instanceof LogicalFileScan)) {
return canNotPush;
}

if (logicalScan instanceof LogicalOlapScan) {
KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType();
if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) {
return canNotPush;
}
}
List<Expression> groupByExpressions = aggregate.getGroupByExpressions();
if (!groupByExpressions.isEmpty() || !aggregate.getDistinctArguments().isEmpty()) {
return canNotPush;
Expand All @@ -213,8 +235,11 @@ private LogicalAggregate<? extends Plan> storageLayerAggregate(
if (!supportedAgg.keySet().containsAll(functionClasses)) {
return canNotPush;
}
if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) {
return canNotPush;
if (logicalScan instanceof LogicalOlapScan) {
KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType();
if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) {
return canNotPush;
}
}
if (aggregateFunctions.stream().anyMatch(fun -> fun.arity() > 1)) {
return canNotPush;
Expand Down Expand Up @@ -281,12 +306,15 @@ private LogicalAggregate<? extends Plan> storageLayerAggregate(
ExpressionUtils.collect(argumentsOfAggregateFunction, SlotReference.class::isInstance);

List<SlotReference> usedSlotInTable = (List<SlotReference>) (List) Project.findProject(aggUsedSlots,
(List<NamedExpression>) (List) olapScan.getOutput());
(List<NamedExpression>) (List) logicalScan.getOutput());

for (SlotReference slot : usedSlotInTable) {
Column column = slot.getColumn().get();
if (keysType == KeysType.AGG_KEYS && !column.isKey()) {
return canNotPush;
if (logicalScan instanceof LogicalOlapScan) {
KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType();
if (keysType == KeysType.AGG_KEYS && !column.isKey()) {
return canNotPush;
}
}
// The zone map max length of CharFamily is 512, do not
// over the length: https://github.com/apache/doris/pull/6293
Expand All @@ -310,19 +338,41 @@ private LogicalAggregate<? extends Plan> storageLayerAggregate(
}
}

PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
.build()
.transform(olapScan, cascadesContext)
.get(0);
if (project != null) {
return aggregate.withChildren(ImmutableList.of(
if (logicalScan instanceof LogicalOlapScan) {
PhysicalOlapScan physicalScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
.build()
.transform((LogicalOlapScan) logicalScan, cascadesContext)
.get(0);

if (project != null) {
return aggregate.withChildren(ImmutableList.of(
project.withChildren(
ImmutableList.of(new PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp)))
));
ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp)))
));
} else {
return aggregate.withChildren(ImmutableList.of(
new PhysicalStorageLayerAggregate(physicalScan, mergeOp)
));
}

} else if (logicalScan instanceof LogicalFileScan) {
PhysicalFileScan physicalScan = (PhysicalFileScan) new LogicalFileScanToPhysicalFileScan()
.build()
.transform((LogicalFileScan) logicalScan, cascadesContext)
.get(0);
if (project != null) {
return aggregate.withChildren(ImmutableList.of(
project.withChildren(
ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp)))
));
} else {
return aggregate.withChildren(ImmutableList.of(
new PhysicalStorageLayerAggregate(physicalScan, mergeOp)
));
}

} else {
return aggregate.withChildren(ImmutableList.of(
new PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp)
));
return canNotPush;
}
}

Expand Down
Loading

0 comments on commit e6befff

Please sign in to comment.