Skip to content

Commit

Permalink
[Opt](Iceberg) handle count pushdown in fe side
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangbutao committed May 15, 2024
1 parent fd749b8 commit d08ddcc
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public abstract class FileScanNode extends ExternalScanNode {
protected long totalPartitionNum = 0;
protected long readPartitionNum = 0;
protected long fileSplitSize;
public long rowCount = 0;

public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ private List<Split> doGetSplits() throws UserException {
HashSet<String> partitionPathSet = new HashSet<>();
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

long rowCount = getCountFromSnapshot();
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount > 0) {
this.rowCount = rowCount;
return new ArrayList<>();
}
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
Expand Down Expand Up @@ -264,6 +269,7 @@ private List<Split> doGetSplits() throws UserException {
throw new UserException(e.getMessage(), e.getCause());
}

// TODO: Need to delete this as we can handle count pushdown in fe side
TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
// we can create a special empty split and skip the plan process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
Expand Down Expand Up @@ -414,6 +416,7 @@ private PhysicalPlan chooseBestPlan(Group rootGroup, PhysicalProperties physical

/**
* getting hints explain string, which specified by enumerate and show in lists
*
* @param hints hint map recorded in statement context
* @return explain string shows using of hint
*/
Expand Down Expand Up @@ -594,6 +597,18 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
);
}
return Optional.of(resultSet);
} else if (child instanceof PhysicalHashAggregate && getScanNodes().get(0) instanceof IcebergScanNode) {
List<Column> columns = Lists.newArrayList();
NamedExpression output = physicalPlan.getOutput().get(0);
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) {
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(
Lists.newArrayList(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount))));
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
return Optional.of(resultSet);
}
return Optional.empty();
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.nereids.PlannerHook;
import org.apache.doris.qe.CommonResultSet;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -636,13 +637,27 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
return Optional.empty();
}
SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
if (!parsedSelectStmt.getTableRefs().isEmpty()) {
if (!(singleNodePlanner.getScanNodes().get(0) instanceof IcebergScanNode)
&& !parsedSelectStmt.getTableRefs().isEmpty()) {
return Optional.empty();
}

List<SelectListItem> selectItems = parsedSelectStmt.getSelectList().getItems();
List<Column> columns = new ArrayList<>(selectItems.size());
List<String> columnLabels = parsedSelectStmt.getColLabels();
List<String> data = new ArrayList<>();
if (((IcebergScanNode) getScanNodes().get(0)).rowCount > 0) {
SelectListItem item = selectItems.get(0);
Expr expr = item.getExpr();
String columnName = columnLabels.get(0);
columns.add(new Column(columnName, expr.getType()));
data.add(String.valueOf(((IcebergScanNode) getScanNodes().get(0)).rowCount));
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
// only support one iceberg scan node and one count, e.g. select count(*) from icetbl;
return Optional.of(resultSet);
}

for (int i = 0; i < selectItems.size(); i++) {
SelectListItem item = selectItems.get(i);
Expr expr = item.getExpr();
Expand Down

0 comments on commit d08ddcc

Please sign in to comment.