Skip to content

Commit

Permalink
[opt](hive)opt select count(*) stmt push down agg on hive
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Jul 22, 2023
1 parent 3b244e6 commit 825af66
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 28 deletions.
5 changes: 3 additions & 2 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ class Block;
class GenericReader {
public:
virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;

virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof,TPushAggOp::type push_down_agg_type_opt) {

virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof,
TPushAggOp::type push_down_agg_type_opt) {
return Status::NotSupported("not support this type!");
};

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ class RowGroupReader {
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; }

ParquetColumnReader::Statistics statistics();
int64_t get__remaining_rows(){ return _remaining_rows;}
int64_t get__remaining_rows() { return _remaining_rows; }

private:
void _merge_read_ranges(std::vector<RowRange>& row_ranges);
Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof);
Expand Down
11 changes: 5 additions & 6 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,24 +511,23 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
return Status::OK();
}

Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof,TPushAggOp::type push_down_agg_type_opt) {

if (push_down_agg_type_opt != TPushAggOp::type::COUNT){
Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof,
TPushAggOp::type push_down_agg_type_opt) {
if (push_down_agg_type_opt != TPushAggOp::type::COUNT) {
return Status::NotSupported("min/max push down is not supported for parquet files");
}
size_t rows = 0;


// out of use _t_metadata->num_rows , because for the same file,
// the optimizer may generate multiple VFileScanner with different _scan_range
while (_read_row_groups.size() > 0) {
_next_row_group_reader();
rows+=_current_group_reader->get__remaining_rows();
rows += _current_group_reader->get__remaining_rows();
}

//fill one column is enough
auto cols = block->mutate_columns();
for(auto& col:cols) {
for (auto& col : cols) {
col->resize(rows);
break;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ class ParquetReader : public GenericReader {

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status get_next_block(Block* block, size_t* read_rows, bool* eof,TPushAggOp::type push_down_agg_type_opt) override;
Status get_next_block(Block* block, size_t* read_rows, bool* eof,
TPushAggOp::type push_down_agg_type_opt) override;
void close();

RowRange get_whole_range() { return _whole_range; }
Expand Down
7 changes: 3 additions & 4 deletions be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ Status NewOlapScanNode::_process_conjuncts() {

Status NewOlapScanNode::_build_key_ranges_and_filters() {
if (push_down_agg_type_opt == TPushAggOp::NONE) {
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
DCHECK(column_types.size() == column_names.size());

Expand Down Expand Up @@ -325,9 +325,8 @@ Status NewOlapScanNode::_build_key_ranges_and_filters() {
range);
}
} else {
_runtime_profile->add_info_string(
"PushDownAggregate",
push_down_agg_to_string(push_down_agg_type_opt));
_runtime_profile->add_info_string("PushDownAggregate",
push_down_agg_to_string(push_down_agg_type_opt));
}

if (_state->enable_profile()) {
Expand Down
13 changes: 6 additions & 7 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,20 +255,19 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.direct_mode = true;
_aggregation = true;
} else {
_tablet_reader_params.direct_mode =
_aggregation || single_version ||
(_parent->push_down_agg_type_opt != TPushAggOp::NONE);
}
_tablet_reader_params.direct_mode = _aggregation || single_version ||
(_parent->push_down_agg_type_opt != TPushAggOp::NONE);
}

RETURN_IF_ERROR(_init_return_columns());

_tablet_reader_params.tablet = _tablet;
_tablet_reader_params.tablet_schema = _tablet_schema;
_tablet_reader_params.reader_type = ReaderType::READER_QUERY;
_tablet_reader_params.aggregation = _aggregation;
if (_parent->push_down_agg_type_opt) {
_tablet_reader_params.push_down_agg_type_opt = _parent->push_down_agg_type_opt;
}
if (_parent->push_down_agg_type_opt) {
_tablet_reader_params.push_down_agg_type_opt = _parent->push_down_agg_type_opt;
}
_tablet_reader_params.version = Version(0, _version);

// TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
#include "vec/core/block.h"
#include "vec/exec/scan/new_olap_scanner.h" // IWYU pragma: keep
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscanner.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exec/scan/vscanner.h"
#include "vfile_scanner.h"

namespace doris::vectorized {
Expand Down
10 changes: 5 additions & 5 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,13 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo

// Read next block.

if ( _parent -> push_down_agg_type_opt != TPushAggOp::type ::NONE ){
if (_parent->push_down_agg_type_opt != TPushAggOp::type ::NONE) {
//Prevent FE misjudging the "select count/min/max ..." statement
if (Status::OK() ==
_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof,_parent-> push_down_agg_type_opt ))
{
if (Status::OK() == _cur_reader->get_next_block(_src_block_ptr, &read_rows,
&_cur_reader_eof,
_parent->push_down_agg_type_opt)) {
_cur_reader.reset(nullptr);
_cur_reader_eof=true;
_cur_reader_eof = true;
return Status::OK();
}
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {

std::unordered_map<std::string, int> _colname_to_slot_id;
std::vector<int> _col_distribute_ids;

public:
TPushAggOp::type push_down_agg_type_opt;
TPushAggOp::type push_down_agg_type_opt;

private:
Status _normalize_conjuncts();
Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context,
Expand Down

0 comments on commit 825af66

Please sign in to comment.