Skip to content

Commit

Permalink
[Enhancement] remove iceberg getPrunedPartitions interface (#52830)
Browse files Browse the repository at this point in the history
Why I'm doing:
getPrunedPartitions for iceberg query would do planFiles task which could task long time in optimizer RBO stage.

What I'm doing:
remove getPrunedPartitions in OptExternalPartitionPruner for iceberg table
2.get MV Compensate Partitions use Mv base table updateInfo instead of PrunedPartitions
Fixes [daily benchmark] query iceberg table error StarRocksTest#8795

Signed-off-by: Youngwb <[email protected]>
  • Loading branch information
Youngwb authored Nov 14, 2024
1 parent b64adaf commit 8c16a01
Show file tree
Hide file tree
Showing 20 changed files with 170 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public Set<String> getToRefreshPartitionNames() {
return toRefreshPartitionNames;
}

public Map<String, PCell> getNameToPartKeys() {
return nameToPartKeys;
}

/**
* Add partition names that base table needs to be refreshed
* @param toRefreshPartitionNames the partition names that need to be refreshed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,6 @@ public Statistics getTableStatistics(OptimizerContext session, Table table, Map<
return normal.getTableStatistics(session, table, columns, partitionKeys, predicate, limit, version);
}

@Override
public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predicate, long limit, TableVersionRange version) {
return normal.getPrunedPartitions(table, predicate, limit, version);
}

@Override
public Set<DeleteFile> getDeleteFiles(IcebergTable table, Long snapshotId, ScalarOperator predicate, FileContent content) {
return normal.getDeleteFiles(table, snapshotId, predicate, content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ default boolean prepareMetadata(MetaPreparationItem item, Tracers tracers, Conne
return true;
}

default List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predicate,
long limit, TableVersionRange version) {
throw new StarRocksConnectorException("This connector doesn't support pruning partitions");
}

// return true if the connector has self info schema
default boolean hasSelfInfoSchema() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,6 @@ private void triggerIcebergPlanFilesIfNeeded(PredicateSearchKey key, IcebergTabl
}
}

@Override
public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predicate, long limit, TableVersionRange version) {
IcebergTable icebergTable = (IcebergTable) table;
String dbName = icebergTable.getRemoteDbName();
Expand Down Expand Up @@ -942,9 +941,9 @@ private Iterator<FileScanTask> buildFileScanTaskIterator(IcebergTable icebergTab
scanContext.setLocalPlanningMaxSlotSize(catalogProperties.getLocalPlanningMaxSlotBytes());

TableScan scan = icebergCatalog.getTableScan(nativeTbl, scanContext)
.useSnapshot(snapshotId)
.metricsReporter(metricsReporter)
.planWith(jobPlanningExecutor);
.useSnapshot(snapshotId)
.metricsReporter(metricsReporter)
.planWith(jobPlanningExecutor);

if (enableCollectColumnStats) {
scan = scan.includeColumnStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,6 @@ public List<PartitionInfo> getPartitions(Table table, List<String> partitionName
return metadata.getPartitions(table, partitionNames);
}

@Override
public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predicate, long limit, TableVersionRange version) {
ConnectorMetadata metadata = metadataOfTable(table);
return metadata.getPrunedPartitions(table, predicate, limit, version);
}

@Override
public Statistics getTableStatistics(OptimizerContext session, Table table, Map<ColumnRefOperator, Column> columns,
List<PartitionKey> partitionKeys, ScalarOperator predicate, long limit,
Expand Down
14 changes: 0 additions & 14 deletions fe/fe-core/src/main/java/com/starrocks/server/MetadataMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -659,20 +659,6 @@ public List<String> listPartitionNamesByValue(String catalogName, String dbName,
return ImmutableList.copyOf(partitionNames.build());
}

public List<PartitionKey> getPrunedPartitions(String catalogName, Table table, ScalarOperator predicate,
long limit, TableVersionRange version) {
Optional<ConnectorMetadata> connectorMetadata = getOptionalMetadata(catalogName);
if (connectorMetadata.isPresent()) {
try {
return connectorMetadata.get().getPrunedPartitions(table, predicate, limit, version);
} catch (Exception e) {
LOG.error("Failed to getPrunedPartitions on [{}.{}]", catalogName, table, e);
throw e;
}
}
return new ArrayList<>();
}

public Statistics getTableStatisticsFromInternalStatistics(Table table, Map<ColumnRefOperator, Column> columns) {
List<ColumnRefOperator> requiredColumnRefs = new ArrayList<>(columns.keySet());
List<String> columnNames = requiredColumnRefs.stream().map(col -> columns.get(col).getName()).collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DeltaLakeTable;
import com.starrocks.catalog.HiveMetaStoreTable;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.PaimonTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
Expand Down Expand Up @@ -424,30 +423,6 @@ private static void computePartitionInfo(LogicalScanOperator operator, Optimizer

scanOperatorPredicates.setSelectedPartitionIds(selectedPartitionIds);
scanOperatorPredicates.getNoEvalPartitionConjuncts().addAll(partitionPruner.getNoEvalConjuncts());
} else if (table instanceof IcebergTable) {
IcebergTable icebergTable = (IcebergTable) table;
if (operator.getTableVersionRange().end().isEmpty()) {
// TODO: for iceberg table, it cannot decide whether it's pruned or not when `selectedPartitionIds`
// is empty. It's expensive to set all partitions here.
return;
}

// Use mutable map instead of immutable map so can be re-partition-prune, see ScanOperatorPredicates#clear().
Map<Long, PartitionKey> partitionKeyMap = Maps.newHashMap();
if (table.isUnPartitioned()) {
partitionKeyMap.put(0L, new PartitionKey());
} else {
String catalogName = icebergTable.getCatalogName();
List<PartitionKey> partitionKeys = GlobalStateMgr.getCurrentState().getMetadataMgr()
.getPrunedPartitions(catalogName, icebergTable, operator.getPredicate(),
operator.getLimit(), operator.getTableVersionRange());
for (PartitionKey partitionKey : partitionKeys) {
partitionKeyMap.put(context.getNextUniquePartitionId(), partitionKey);
}
}

scanOperatorPredicates.getIdToPartitionKey().putAll(partitionKeyMap);
scanOperatorPredicates.setSelectedPartitionIds(partitionKeyMap.keySet());
} else if (table instanceof PaimonTable) {
PaimonTable paimonTable = (PaimonTable) table;
List<String> fieldNames = operator.getColRefToColumnMetaMap().keySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@
public class MvPartitionCompensator {
private static final Logger LOG = LogManager.getLogger(MvPartitionCompensator.class);

/**
* External scan operators should be supported if it has been supported in
* {@link com.starrocks.sql.optimizer.rewrite.OptExternalPartitionPruner}
*/
// supported external scan types for partition compensate
public static final ImmutableSet<OperatorType> SUPPORTED_PARTITION_COMPENSATE_EXTERNAL_SCAN_TYPES =
ImmutableSet.<OperatorType>builder()
.add(OperatorType.LOGICAL_HIVE_SCAN)
Expand All @@ -115,6 +112,15 @@ public class MvPartitionCompensator {
.addAll(SUPPORTED_PARTITION_COMPENSATE_EXTERNAL_SCAN_TYPES)
.build();

/**
* External scan operators could use {@link com.starrocks.sql.optimizer.rewrite.OptExternalPartitionPruner}
* to prune partitions, so we need to compensate partition predicates for them.
*/
public static final ImmutableSet<OperatorType> SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES =
ImmutableSet.<OperatorType>builder()
.add(OperatorType.LOGICAL_HIVE_SCAN)
.build();

/**
* Whether the table is supported to compensate extra partition predicates.
* @param t: input table
Expand Down Expand Up @@ -436,6 +442,10 @@ private static boolean supportCompensatePartitionPredicateForHiveScan(List<Parti

private static List<ScalarOperator> compensatePartitionPredicateForExternalTables(MaterializationContext mvContext,
LogicalScanOperator scanOperator) {
if (!SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES.contains(scanOperator.getOpType())) {
return Lists.newArrayList();
}

ScanOperatorPredicates scanOperatorPredicates = null;
try {
scanOperatorPredicates = scanOperator.getScanOperatorPredicates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import com.starrocks.common.util.DebugUtil;
import com.starrocks.connector.PartitionUtil;
import com.starrocks.qe.SessionVariable;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PListCell;
import com.starrocks.sql.common.PRangeCell;
import com.starrocks.sql.optimizer.MaterializationContext;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.operator.OperatorType;
Expand All @@ -53,8 +56,10 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.connector.PartitionUtil.createPartitionKey;
import static com.starrocks.connector.PartitionUtil.generateMVPartitionName;
import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVRewrite;
import static com.starrocks.sql.optimizer.rule.transformation.materialization.MvPartitionCompensator.SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES;

/**
* MVCompensationBuilder is used to build a mv compensation for materialized view in mv rewrite.
Expand Down Expand Up @@ -234,7 +239,7 @@ private MVCompensation getMVCompensationOfTable(Table refBaseTable,
return getMVCompensationOfOlapTable(refBaseTable, partitionNamesToRefresh,
(LogicalOlapScanOperator) scanOperator);
} else if (MvPartitionCompensator.isTableSupportedPartitionCompensate(refBaseTable)) {
return getMVCompensationForExternal(partitionNamesToRefresh, scanOperator);
return getMVCompensationForExternal(refBaseTable, partitionNamesToRefresh, scanOperator);
} else {
SessionVariable sessionVariable = mvContext.getOptimizerContext().getSessionVariable();
return MVCompensation.createUnkownState(sessionVariable);
Expand Down Expand Up @@ -295,38 +300,47 @@ private MVCompensation ofBaseTableCompensations(Map<Table, BaseCompensation<?>>
}
}

private MVCompensation getMVCompensationForExternal(Set<String> refTablePartitionNamesToRefresh,
private MVCompensation getMVCompensationForExternal(Table refBaseTable,
Set<String> refTablePartitionNamesToRefresh,
LogicalScanOperator refScanOperator) {
SessionVariable sessionVariable = mvContext.getOptimizerContext().getSessionVariable();
try {
ScanOperatorPredicates scanOperatorPredicates = refScanOperator.getScanOperatorPredicates();
Collection<Long> selectPartitionIds = scanOperatorPredicates.getSelectedPartitionIds();
if (Objects.isNull(selectPartitionIds) || selectPartitionIds.size() == 0) {
// see OptExternalPartitionPruner#computePartitionInfo:
// it's not the same meaning when selectPartitionIds is null and empty for hive and other tables
if (refScanOperator.getOpType() == OperatorType.LOGICAL_HIVE_SCAN) {
List<PartitionKey> selectPartitionKeys = scanOperatorPredicates.getSelectedPartitionKeys();
// For scan operator which support prune partitions with OptExternalPartitionPruner,
// we could only compensate partitions which selected partitions need to refresh.
if (SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES.contains(refScanOperator.getOpType())) {
if (Objects.isNull(selectPartitionIds) || selectPartitionIds.isEmpty()) {
// see OptExternalPartitionPruner#computePartitionInfo:
// it's not the same meaning when selectPartitionIds is null and empty for hive and other tables
if (refScanOperator.getOpType() == OperatorType.LOGICAL_HIVE_SCAN) {
return MVCompensation.createNoCompensateState(sessionVariable);
} else {
return MVCompensation.createUnkownState(sessionVariable);
}
}

if (selectPartitionKeys.stream()
.map(PartitionUtil::generateMVPartitionName)
.noneMatch(refTablePartitionNamesToRefresh::contains)) {
return MVCompensation.createNoCompensateState(sessionVariable);
} else {
return MVCompensation.createUnkownState(sessionVariable);
}
}
List<PartitionKey> selectPartitionKeys = scanOperatorPredicates.getSelectedPartitionKeys();
if (selectPartitionKeys.stream()
.map(PartitionUtil::generateMVPartitionName)
.noneMatch(x -> refTablePartitionNamesToRefresh.contains(x))) {
return MVCompensation.createNoCompensateState(sessionVariable);
}
// if mv's to refresh partitions contains any of query's select partition ids, then rewrite with compensate.
List<PartitionKey> toRefreshRefTablePartitions = getMVCompensatePartitionsOfExternal(
List<PartitionKey> toRefreshRefTablePartitions = getMVCompensatePartitionsOfExternal(refBaseTable,
refTablePartitionNamesToRefresh, refScanOperator);
if (toRefreshRefTablePartitions == null) {
return MVCompensation.createUnkownState(sessionVariable);
}

Table table = refScanOperator.getTable();
if (Sets.newHashSet(toRefreshRefTablePartitions).containsAll(selectPartitionKeys)) {
logMVRewrite(mvContext, "All external table {}'s selected partitions {} need to refresh, no rewrite",
table.getName(), selectPartitionIds);
return MVCompensation.createNoRewriteState(sessionVariable);
if (SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES.contains(refScanOperator.getOpType())) {
if (Sets.newHashSet(toRefreshRefTablePartitions).containsAll(selectPartitionKeys)) {
logMVRewrite(mvContext, "All external table {}'s selected partitions {} need to refresh, no rewrite",
table.getName(), selectPartitionIds);
return MVCompensation.createNoRewriteState(sessionVariable);
}
}
return ofExternalTableCompensation(table, toRefreshRefTablePartitions);
} catch (AnalysisException e) {
Expand Down Expand Up @@ -359,9 +373,24 @@ private List<Long> getMVCompensatePartitionsOfOlap(Set<String> partitionNamesToR
return refTableCompensatePartitionIds;
}

private List<PartitionKey> getMVCompensatePartitionsOfExternal(Set<String> refTablePartitionNamesToRefresh,
private List<PartitionKey> getMVCompensatePartitionsOfExternal(Table refBaseTable,
Set<String> refTablePartitionNamesToRefresh,
LogicalScanOperator refScanOperator)
throws AnalysisException {
if (SUPPORTED_PARTITION_PRUNE_EXTERNAL_SCAN_TYPES.contains(refScanOperator.getOpType())) {
// For external table which support partition prune with OptExternalPartitionPruner,
// could use selectPartitionKeys to get the compensate partitions.
return getMVCompensatePartitionsOfExternalWithPartitionPruner(refTablePartitionNamesToRefresh,
refScanOperator);
} else {
return getMVCompensatePartitionsOfExternalWithoutPartitionPruner(refBaseTable, refTablePartitionNamesToRefresh);
}
}

private List<PartitionKey> getMVCompensatePartitionsOfExternalWithPartitionPruner(
Set<String> refTablePartitionNamesToRefresh,
LogicalScanOperator refScanOperator) {
List<PartitionKey> refTableCompensatePartitionKeys = Lists.newArrayList();
ScanOperatorPredicates scanOperatorPredicates = null;
try {
scanOperatorPredicates = refScanOperator.getScanOperatorPredicates();
Expand All @@ -371,7 +400,6 @@ private List<PartitionKey> getMVCompensatePartitionsOfExternal(Set<String> refTa
if (scanOperatorPredicates == null) {
return null;
}
List<PartitionKey> refTableCompensatePartitionKeys = Lists.newArrayList();
List<PartitionKey> selectPartitionKeys = scanOperatorPredicates.getSelectedPartitionKeys();
// different behavior for different external table types
if (selectPartitionKeys.isEmpty() && refScanOperator.getOpType() != OperatorType.LOGICAL_HIVE_SCAN) {
Expand All @@ -385,4 +413,35 @@ private List<PartitionKey> getMVCompensatePartitionsOfExternal(Set<String> refTa
}
return refTableCompensatePartitionKeys;
}

private List<PartitionKey> getMVCompensatePartitionsOfExternalWithoutPartitionPruner(
Table refBaseTable,
Set<String> refTablePartitionNamesToRefresh) {
MvBaseTableUpdateInfo baseTableUpdateInfo = mvUpdateInfo.getBaseTableUpdateInfos().get(refBaseTable);
if (baseTableUpdateInfo == null) {
return null;
}

Map<String, PCell> nameToPartitionKeys = baseTableUpdateInfo.getNameToPartKeys();
List<PartitionKey> partitionKeys = Lists.newArrayList();
try {
for (String partitionName : refTablePartitionNamesToRefresh) {
if (!nameToPartitionKeys.containsKey(partitionName)) {
return null;
}
PCell pCell = nameToPartitionKeys.get(partitionName);
if (pCell instanceof PRangeCell) {
partitionKeys.add(((PRangeCell) pCell).getRange().lowerEndpoint());
} else if (pCell instanceof PListCell) {
List<Column> partitionColumns = refBaseTable.getPartitionColumns();
partitionKeys.add(createPartitionKey(((PListCell) pCell).getPartitionItems().get(0), partitionColumns));
}
}
} catch (Exception e) {
logMVRewrite("Failed to get partition keys for ref base table: {}", refBaseTable.getName(),
DebugUtil.getStackTrace(e));
return null;
}
return partitionKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -494,12 +493,6 @@ public Statistics getTableStatistics(OptimizerContext session, com.starrocks.cat
}
}

@Override
public List<PartitionKey> getPrunedPartitions(com.starrocks.catalog.Table table, ScalarOperator predicate,
long limit, TableVersionRange version) {
return new ArrayList<>();
}

public void addRowsToPartition(String dbName, String tableName, int rowCount, String partitionName) {
IcebergTable icebergTable = MOCK_TABLE_MAP.get(dbName).get(tableName).icebergTable;
Table nativeTable = icebergTable.getNativeTable();
Expand Down
Loading

0 comments on commit 8c16a01

Please sign in to comment.