Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](hive)opt select count(*) stmt push down agg on parquet in hive . #22115

Merged
merged 10 commits into from
Jul 28, 2023
Merged
8 changes: 8 additions & 0 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <gen_cpp/PlanNodes_types.h>

#include "common/factory_creator.h"
#include "common/status.h"
#include "runtime/types.h"
Expand All @@ -30,7 +32,12 @@ class Block;
// a set of blocks with specified schema,
class GenericReader {
public:
GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}
void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
_push_down_agg_type = push_down_agg_type;
}
virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;

virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() {
std::unordered_map<std::string, TypeDescriptor> map;
return map;
Expand Down Expand Up @@ -65,6 +72,7 @@ class GenericReader {

/// Whether the underlying FileReader has filled the partition&missing columns
bool _fill_all_columns = false;
TPushAggOp::type _push_down_agg_type;
};

} // namespace doris::vectorized
18 changes: 18 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ Status OrcReader::_create_file_reader() {
} catch (std::exception& e) {
return Status::InternalError("Init OrcReader failed. reason = {}", e.what());
}
_remaining_rows = _reader->getNumberOfRows();

return Status::OK();
}

Expand Down Expand Up @@ -1370,6 +1372,22 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int
}

Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);

set_remaining_rows(get_remaining_rows() - rows);

for (auto& col : block->mutate_columns()) {
col->resize(rows);
}

*read_rows = rows;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate with line 1388

if (get_remaining_rows() == 0) {
*eof = true;
}
return Status::OK();
}

if (_lazy_read_ctx.can_lazy_read) {
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,11 @@ class OrcReader : public GenericReader {
const NullMap* null_map,
orc::ColumnVectorBatch* cvb,
const orc::Type* orc_column_typ);
int64_t get_remaining_rows() { return _remaining_rows; }
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }

private:
int64_t _remaining_rows = 0;
RuntimeProfile* _profile = nullptr;
RuntimeState* _state = nullptr;
const TFileScanRangeParams& _scan_params;
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class RowGroupReader {
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; }

ParquetColumnReader::Statistics statistics();
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
int64_t get_remaining_rows() { return _remaining_rows; }

private:
void _merge_read_ranges(std::vector<RowRange>& row_ranges);
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,24 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
}
}
DCHECK(_current_group_reader != nullptr);
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
auto rows = std::min(_current_group_reader->get_remaining_rows(), (int64_t)_batch_size);

_current_group_reader->set_remaining_rows(_current_group_reader->get_remaining_rows() -
rows);

for (auto& col : block->mutate_columns()) {
col->resize(rows);
}

*read_rows = rows;
if (_current_group_reader->get_remaining_rows() == 0) {
_current_group_reader.reset(nullptr);
}

return Status::OK();
}

{
SCOPED_RAW_TIMER(&_statistics.column_read_time);
Status batch_st =
Expand Down
8 changes: 3 additions & 5 deletions be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ Status NewOlapScanNode::_process_conjuncts() {
}

Status NewOlapScanNode::_build_key_ranges_and_filters() {
if (!_olap_scan_node.__isset.push_down_agg_type_opt ||
_olap_scan_node.push_down_agg_type_opt == TPushAggOp::NONE) {
if (_push_down_agg_type == TPushAggOp::NONE) {
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
DCHECK(column_types.size() == column_names.size());
Expand Down Expand Up @@ -326,9 +325,8 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() {
range);
}
} else {
_runtime_profile->add_info_string(
"PushDownAggregate",
push_down_agg_to_string(_olap_scan_node.push_down_agg_type_opt));
_runtime_profile->add_info_string("PushDownAggregate",
push_down_agg_to_string(_push_down_agg_type));
}

if (_state->enable_profile()) {
Expand Down
12 changes: 4 additions & 8 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,13 @@ Status NewOlapScanner::_init_tablet_reader_params(
const std::vector<FunctionFilter>& function_filters) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
const bool single_version = _tablet_reader_params.has_single_version();
auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent);

if (_state->skip_storage_engine_merge()) {
_tablet_reader_params.direct_mode = true;
_aggregation = true;
} else {
_tablet_reader_params.direct_mode =
_aggregation || single_version ||
real_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
_tablet_reader_params.direct_mode = _aggregation || single_version ||
(_parent->get_push_down_agg_type() != TPushAggOp::NONE);
}

RETURN_IF_ERROR(_init_return_columns());
Expand All @@ -264,10 +263,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.tablet_schema = _tablet_schema;
_tablet_reader_params.reader_type = ReaderType::READER_QUERY;
_tablet_reader_params.aggregation = _aggregation;
if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt) {
_tablet_reader_params.push_down_agg_type_opt =
real_parent->_olap_scan_node.push_down_agg_type_opt;
}
_tablet_reader_params.push_down_agg_type_opt = _parent->get_push_down_agg_type();
_tablet_reader_params.version = Version(0, _version);

// TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "vec/core/block.h"
#include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exec/scan/vscanner.h"
#include "vfile_scanner.h"

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo
RETURN_IF_ERROR(_init_src_block(block));
{
SCOPED_TIMER(_get_block_timer);

// Read next block.
// Some of column in block may not be filled (column not exist in file)
RETURN_IF_ERROR(
Expand Down Expand Up @@ -711,6 +712,7 @@ Status VFileScanner::_get_next_reader() {
_name_to_col_type.clear();
_missing_cols.clear();
_cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
_cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
Expand Down
13 changes: 13 additions & 0 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
} else {
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
}

// tnode.olap_scan_node.push_down_agg_type_opt field is deprecated
// Introduced a new field : tnode.push_down_agg_type_opt
//
// make it compatible here
if (tnode.__isset.push_down_agg_type_opt) {
_push_down_agg_type = tnode.push_down_agg_type_opt;
} else if (tnode.olap_scan_node.__isset.push_down_agg_type_opt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comment in code to explain these compatibility work

_push_down_agg_type = tnode.olap_scan_node.push_down_agg_type_opt;

} else {
_push_down_agg_type = TPushAggOp::type::NONE;
}
return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {
}
}

TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
// Get next block.
// If eos is true, no more data will be read and block should be empty.
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
Expand Down Expand Up @@ -351,6 +352,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {
std::unordered_map<std::string, int> _colname_to_slot_id;
std::vector<int> _col_distribute_ids;

TPushAggOp::type _push_down_agg_type;

private:
Status _normalize_conjuncts();
Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
Expand Down Expand Up @@ -115,6 +118,20 @@ public List<Rule> buildRules() {
return storageLayerAggregate(agg, project, olapScan, ctx.cascadesContext);
})
),
RuleType.STORAGE_LAYER_AGGREGATE_WITH_PROJECT.build(
logicalAggregate(
logicalProject(
logicalFileScan()
)
)
.when(agg -> agg.isNormalized() && enablePushDownNoGroupAgg())
.thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalFileScan>> agg = ctx.root;
LogicalProject<LogicalFileScan> project = agg.child();
LogicalFileScan fileScan = project.child();
return storageLayerAggregate(agg, project, fileScan, ctx.cascadesContext);
})
),
RuleType.ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT.build(
basePattern
.when(agg -> agg.getDistinctArguments().size() == 0)
Expand Down Expand Up @@ -190,14 +207,19 @@ && couldConvertToMulti(agg))
private LogicalAggregate<? extends Plan> storageLayerAggregate(
LogicalAggregate<? extends Plan> aggregate,
@Nullable LogicalProject<? extends Plan> project,
LogicalOlapScan olapScan, CascadesContext cascadesContext) {
LogicalRelation logicalScan, CascadesContext cascadesContext) {
final LogicalAggregate<? extends Plan> canNotPush = aggregate;

KeysType keysType = olapScan.getTable().getKeysType();
if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) {
if (!(logicalScan instanceof LogicalOlapScan) && !(logicalScan instanceof LogicalFileScan)) {
return canNotPush;
}

if (logicalScan instanceof LogicalOlapScan) {
KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType();
if (keysType != KeysType.AGG_KEYS && keysType != KeysType.DUP_KEYS) {
return canNotPush;
}
}
List<Expression> groupByExpressions = aggregate.getGroupByExpressions();
if (!groupByExpressions.isEmpty() || !aggregate.getDistinctArguments().isEmpty()) {
return canNotPush;
Expand All @@ -213,8 +235,11 @@ private LogicalAggregate<? extends Plan> storageLayerAggregate(
if (!supportedAgg.keySet().containsAll(functionClasses)) {
return canNotPush;
}
if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) {
return canNotPush;
if (logicalScan instanceof LogicalOlapScan) {
KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType();
if (functionClasses.contains(Count.class) && keysType != KeysType.DUP_KEYS) {
return canNotPush;
}
}
if (aggregateFunctions.stream().anyMatch(fun -> fun.arity() > 1)) {
return canNotPush;
Expand Down Expand Up @@ -281,12 +306,15 @@ private LogicalAggregate<? extends Plan> storageLayerAggregate(
ExpressionUtils.collect(argumentsOfAggregateFunction, SlotReference.class::isInstance);

List<SlotReference> usedSlotInTable = (List<SlotReference>) (List) Project.findProject(aggUsedSlots,
(List<NamedExpression>) (List) olapScan.getOutput());
(List<NamedExpression>) (List) logicalScan.getOutput());

for (SlotReference slot : usedSlotInTable) {
Column column = slot.getColumn().get();
if (keysType == KeysType.AGG_KEYS && !column.isKey()) {
return canNotPush;
if (logicalScan instanceof LogicalOlapScan) {
KeysType keysType = ((LogicalOlapScan) logicalScan).getTable().getKeysType();
if (keysType == KeysType.AGG_KEYS && !column.isKey()) {
return canNotPush;
}
}
// The zone map max length of CharFamily is 512, do not
// over the length: https://github.com/apache/doris/pull/6293
Expand All @@ -310,19 +338,41 @@ private LogicalAggregate<? extends Plan> storageLayerAggregate(
}
}

PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
.build()
.transform(olapScan, cascadesContext)
.get(0);
if (project != null) {
return aggregate.withChildren(ImmutableList.of(
if (logicalScan instanceof LogicalOlapScan) {
PhysicalOlapScan physicalScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
.build()
.transform((LogicalOlapScan) logicalScan, cascadesContext)
.get(0);

if (project != null) {
return aggregate.withChildren(ImmutableList.of(
project.withChildren(
ImmutableList.of(new PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp)))
));
ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp)))
));
} else {
return aggregate.withChildren(ImmutableList.of(
new PhysicalStorageLayerAggregate(physicalScan, mergeOp)
));
}

} else if (logicalScan instanceof LogicalFileScan) {
PhysicalFileScan physicalScan = (PhysicalFileScan) new LogicalFileScanToPhysicalFileScan()
.build()
.transform((LogicalFileScan) logicalScan, cascadesContext)
.get(0);
if (project != null) {
return aggregate.withChildren(ImmutableList.of(
project.withChildren(
ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp)))
));
} else {
return aggregate.withChildren(ImmutableList.of(
new PhysicalStorageLayerAggregate(physicalScan, mergeOp)
));
}

} else {
return aggregate.withChildren(ImmutableList.of(
new PhysicalStorageLayerAggregate(physicalOlapScan, mergeOp)
));
return canNotPush;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want

PhysicalOlapScan physicalScan;
    if (logicalScan instanceof LogicalOlapScan) {
             physicalScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
                    .build()
                    .transform((LogicalOlapScan) logicalScan, cascadesContext)
                    .get(0);

    } else if (logicalScan instanceof LogicalFileScan) {
            physicalScan = (PhysicalFileScan) new LogicalFileScanToPhysicalFileScan()
                    .build()
                    .transform((LogicalFileScan) logicalScan, cascadesContext)
                    .get(0);            
    } else {
        return canNotPush;
    }

    if (project != null) {
        return aggregate.withChildren(ImmutableList.of(
                    project.withChildren(
                    ImmutableList.of(new PhysicalStorageLayerAggregate(physicalScan, mergeOp)))
            ));
    } else {
            return aggregate.withChildren(ImmutableList.of(
                new PhysicalStorageLayerAggregate(physicalScan, mergeOp)
            ));
    }

you will get this:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.10.1:compile (default-compile) on project fe-core: Compilation failure: Compilation failure: 
[ERROR] /mnt/datadisk1/changyuwei/doris2/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java:[345,72] incompatible types: org.apache.doris.nereids.trees.plans.physical.PhysicalRelation cannot be converted to org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation
[ERROR] /mnt/datadisk1/changyuwei/doris2/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java:[349,51] incompatible types: org.apache.doris.nereids.trees.plans.physical.PhysicalRelation cannot be converted to org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation

😭😤😭

}
}

Expand Down
Loading