Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] Optimize count operation for iceberg #22923

Merged
merged 6 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class GenericReader {
void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
_push_down_agg_type = push_down_agg_type;
}

virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0;

virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() {
Expand Down
33 changes: 31 additions & 2 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx)
io::IOContext* io_ctx, int64_t push_down_count)
: TableFormatReader(std::move(file_format_reader)),
_profile(profile),
_state(state),
_params(params),
_range(range),
_kv_cache(kv_cache),
_io_ctx(io_ctx) {
_io_ctx(io_ctx),
_remaining_push_down_count(push_down_count) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
_iceberg_profile.num_delete_files =
Expand Down Expand Up @@ -132,10 +133,27 @@ Status IcebergTableReader::init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);

return status;
}

Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
// already get rows from be
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
auto rows =
std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size);
_remaining_push_down_count -= rows;
for (auto& col : block->mutate_columns()) {
col->resize(rows);
}
*read_rows = rows;
if (_remaining_push_down_count == 0) {
*eof = true;
}

return Status::OK();
}

// To support iceberg schema evolution. We change the column name in block to
// make it match with the column name in parquet file before reading data. and
// Set the name back to table column name before return this block.
Expand All @@ -149,6 +167,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool*
}
block->initialize_index_by_name();
}

auto res = _file_format_reader->get_next_block(block, read_rows, eof);
// Set the name back to table column name before return this block.
if (_has_schema_change) {
Expand Down Expand Up @@ -182,6 +201,11 @@ Status IcebergTableReader::get_columns(
}

Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
// We get the count value by doris's be, so we don't need to read the delete file
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
return Status::OK();
}

auto& table_desc = range.table_format_params.iceberg_params;
auto& version = table_desc.format_version;
if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
Expand All @@ -192,10 +216,15 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
if (files.empty()) {
return Status::OK();
}

if (delete_file_type == POSITION_DELETE) {
RETURN_IF_ERROR(_position_delete(files));
}

// todo: equality delete
// If it is a count operation and it has equality delete file kind,
// the push down operation of the count for this split needs to be canceled.

COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class IcebergTableReader : public TableFormatReader {

IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx);
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx,
int64_t push_down_count);
~IcebergTableReader() override = default;

Status init_row_filters(const TFileRangeDesc& range) override;
Expand Down Expand Up @@ -154,6 +154,8 @@ class IcebergTableReader : public TableFormatReader {
io::IOContext* _io_ctx;
bool _has_schema_change = false;
bool _has_iceberg_schema = false;

int64_t _remaining_push_down_count;
};
} // namespace vectorized
} // namespace doris
6 changes: 3 additions & 3 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,9 @@ Status VFileScanner::_get_next_reader() {
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergTableReader> iceberg_reader =
IcebergTableReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _kv_cache,
_io_ctx.get());
IcebergTableReader::create_unique(
std::move(parquet_reader), _profile, _state, *_params, range,
_kv_cache, _io_ctx.get(), _parent->get_push_down_count());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
} else {
_push_down_agg_type = TPushAggOp::type::NONE;
}

if (tnode.__isset.push_down_count) {
_push_down_count = tnode.push_down_count;
}

return Status::OK();
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {
}

TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }

int64_t get_push_down_count() { return _push_down_count; }

// Get next block.
// If eos is true, no more data will be read and block should be empty.
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
Expand Down Expand Up @@ -349,6 +352,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {

TPushAggOp::type _push_down_agg_type;

// Record the value of the aggregate function 'count' from doris's be
int64_t _push_down_count = -1;

private:
Status _normalize_conjuncts();
Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,10 @@ public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) {
this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp;
}

public TPushAggOp getPushDownAggNoGroupingOp() {
return pushDownAggNoGroupingOp;
}

public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.doris.planner.external.iceberg;

import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
Expand All @@ -43,6 +45,8 @@
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TTableFormatFileDesc;

import avro.shaded.com.google.common.base.Preconditions;
Expand All @@ -55,6 +59,7 @@
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
Expand All @@ -68,6 +73,7 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -79,6 +85,9 @@ public class IcebergScanNode extends FileQueryScanNode {

public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
public static final String DEFAULT_DATA_PATH = "/data/";
private static final String TOTAL_RECORDS = "total-records";
private static final String TOTAL_POSITION_DELETES = "total-position-deletes";
private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes";

private IcebergSource source;
private Table icebergTable;
Expand Down Expand Up @@ -210,8 +219,8 @@ public List<Split> getSplits() throws UserException {
splitTask.length(),
splitTask.file().fileSizeInBytes(),
new String[0],
formatVersion,
source.getCatalog().getProperties());
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
Expand All @@ -222,6 +231,12 @@ public List<Split> getSplits() throws UserException {
throw new UserException(e.getMessage(), e.getCause());
}

TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
// we can create a special empty split and skip the plan process
return Collections.singletonList(splits.get(0));
}

readPartitionNum = partitionPathSet.size();

return splits;
Expand Down Expand Up @@ -334,4 +349,50 @@ public TableIf getTargetTable() {
public Map<String, String> getLocationProperties() throws UserException {
return source.getCatalog().getProperties();
}

@Override
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
String aggFunctionName = aggExpr.getFnName().getFunction().toUpperCase();
return "COUNT".equals(aggFunctionName);
}

@Override
public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) {
return !col.isAllowNull();
}

private long getCountFromSnapshot() {
Long specifiedSnapshot;
try {
specifiedSnapshot = getSpecifiedSnapshot();
} catch (UserException e) {
return -1;
}

Snapshot snapshot = specifiedSnapshot == null
? icebergTable.currentSnapshot() : icebergTable.snapshot(specifiedSnapshot);

// empty table
if (snapshot == null) {
return -1;
}

Map<String, String> summary = snapshot.summary();
if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) {
return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
} else {
return -1;
}
}

@Override
protected void toThrift(TPlanNode planNode) {
super.toThrift(planNode);
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
long countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot > 0) {
planNode.setPushDownCount(countFromSnapshot);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@

@Data
public class IcebergSplit extends FileSplit {

// File path will be changed if the file is modified, so there's no need to get modification time.
public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts,
Map<String, String> config) {
Integer formatVersion, Map<String, String> config) {
super(file, start, length, fileLength, hosts, null);
this.formatVersion = formatVersion;
this.config = config;
}

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,8 @@ struct TPlanNode {
47: optional TTestExternalScanNode test_external_scan_node

48: optional TPushAggOp push_down_agg_type_opt

49: optional i64 push_down_count

101: optional list<Exprs.TExpr> projections
102: optional Types.TTupleId output_tuple_id
Expand Down
Loading