Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] (Part 2) Support Iceberg partition transform with multi partition columns #52966

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ public String toString() {
private Optional<Map<Table, List<SlotRef>>> refBaseTablePartitionSlotsOpt = Optional.empty();
// ref bae table to partition column
private Optional<Map<Table, List<Column>>> refBaseTablePartitionColumnsOpt = Optional.empty();
// cache table to base table info's mapping to refresh table
private transient volatile Map<Table, BaseTableInfo> tableToBaseTableInfoCache = Maps.newConcurrentMap();

// Materialized view's output columns may be different from defined query's output columns.
// Record the indexes based on materialized view's column output.
Expand Down Expand Up @@ -612,6 +614,7 @@ public void resetMetadataCache() {
refBaseTablePartitionExprsOpt = Optional.empty();
refBaseTablePartitionSlotsOpt = Optional.empty();
refBaseTablePartitionColumnsOpt = Optional.empty();
tableToBaseTableInfoCache.clear();
}

public String getInactiveReason() {
Expand Down Expand Up @@ -900,6 +903,7 @@ public void copyOnlyForQuery(OlapTable olapTable) {
mv.refBaseTablePartitionExprsOpt = this.refBaseTablePartitionExprsOpt;
mv.refBaseTablePartitionSlotsOpt = this.refBaseTablePartitionSlotsOpt;
mv.refBaseTablePartitionColumnsOpt = this.refBaseTablePartitionColumnsOpt;
mv.tableToBaseTableInfoCache = this.tableToBaseTableInfoCache;
}

@Override
Expand Down Expand Up @@ -1463,7 +1467,7 @@ public boolean containsBaseTable(TableName tableName) {
* @return table to the partition expr map, multi values if mv contains multi ref base tables, empty if it's un-partitioned
*/
public Map<Table, List<Expr>> getRefBaseTablePartitionExprs() {
return refBaseTablePartitionExprsOpt.orElse(Maps.newHashMap());
return refBaseTablePartitionExprsOpt.map(this::refreshBaseTable).orElse(Maps.newHashMap());
}

/**
Expand All @@ -1475,7 +1479,15 @@ public Map<Table, List<Expr>> getRefBaseTablePartitionExprs() {
* un-partitioned
*/
public Map<Table, List<SlotRef>> getRefBaseTablePartitionSlots() {
return refBaseTablePartitionSlotsOpt.orElse(Maps.newHashMap());
return refBaseTablePartitionSlotsOpt.map(this::refreshBaseTable).orElse(Maps.newHashMap());
}

/**
* Get the related partition table and column of the materialized view since one mv can contain multi ref base tables.
* NOTE: The ref-base-table columns' order is guaranteed as the order of mv's defined partition columns' order.
*/
public Map<Table, List<Column>> getRefBaseTablePartitionColumns() {
return refBaseTablePartitionColumnsOpt.map(this::refreshBaseTable).orElse(Maps.newHashMap());
}

/**
Expand Down Expand Up @@ -1556,14 +1568,6 @@ private boolean isManyToManyPartitionRangeMapping(PRangeCell srcRange,
return false;
}

/**
* Get the related partition table and column of the materialized view since one mv can contain multi ref base tables.
* NOTE: The ref-base-table columns' order is guaranteed as the order of mv's defined partition columns' order.
*/
public Map<Table, List<Column>> getRefBaseTablePartitionColumns() {
return refBaseTablePartitionColumnsOpt.orElse(Maps.newLinkedHashMap());
}

private Map<Table, List<Column>> getBaseTablePartitionColumnMapImpl() {
Map<Table, List<Column>> result = Maps.newHashMap();
if (partitionExprMaps == null || partitionExprMaps.isEmpty()) {
Expand Down Expand Up @@ -1752,6 +1756,10 @@ public void gsonPostProcess() throws IOException {
*/
private void analyzePartitionExprs() {
try {
// initialize table to base table info cache
for (BaseTableInfo tableInfo : this.baseTableInfos) {
this.tableToBaseTableInfoCache.put(MvUtils.getTableChecked(tableInfo), tableInfo);
}
// analyze partition exprs for ref base tables
analyzeRefBaseTablePartitionExprs();
// analyze partition exprs
Expand Down Expand Up @@ -1784,6 +1792,25 @@ private void analyzePartitionExprs() {
}
}

/**
* Since the table is cached in the Optional, needs to refresh it again for each query.
* TODO: optimize cases which not care table's refresh-ness.
*/
private <K> Map<Table, K> refreshBaseTable(Map<Table, K> cached) {
Map<Table, K> result = Maps.newHashMap();
for (Map.Entry<Table, K> e : cached.entrySet()) {
Table table = e.getKey();
if (table instanceof IcebergTable || table instanceof DeltaLakeTable) {
Preconditions.checkState(tableToBaseTableInfoCache.containsKey(table));
Table refreshedTable = MvUtils.getTableChecked(tableToBaseTableInfoCache.get(table));
result.put(refreshedTable, e.getValue());
} else {
result.put(table, e.getValue());
}
}
return result;
}

private void analyzeRefBaseTablePartitionExprs() {
Map<Table, List<Expr>> refBaseTablePartitionExprMap = Maps.newHashMap();
for (BaseTableInfo tableInfo : baseTableInfos) {
Expand All @@ -1803,17 +1830,24 @@ private void analyzeRefBaseTablePartitionExprs() {

public void analyzeRefBaseTablePartitionSlots() {
Map<Table, List<SlotRef>> refBaseTablePartitionSlotMap = Maps.newHashMap();
Preconditions.checkState(refBaseTablePartitionExprsOpt.isPresent());
Map<Table, List<Expr>> refBaseTablePartitionExprMap = refBaseTablePartitionExprsOpt.get();
for (BaseTableInfo tableInfo : baseTableInfos) {
Table table = MvUtils.getTableChecked(tableInfo);
List<MVPartitionExpr> mvPartitionExprs = MvUtils.getMvPartitionExpr(partitionExprMaps, table);
List<Expr> mvPartitionExprs = refBaseTablePartitionExprMap.get(table);
if (CollectionUtils.isEmpty(mvPartitionExprs)) {
LOG.info("Base table {} contains no partition expr, skip", table.getName());
continue;
}
List<SlotRef> slotRefs = mvPartitionExprs
.stream()
.map(MVPartitionExpr::getSlotRef)
.collect(Collectors.toList());
List<SlotRef> slotRefs = Lists.newArrayList();
for (Expr expr : mvPartitionExprs) {
List<SlotRef> exprSlotRefs = expr.collectAllSlotRefs();
if (exprSlotRefs.size() != 1) {
LOG.warn("The partition expr {} of table {} contains more than one slot ref, skip", expr, table.getName());
continue;
}
slotRefs.add(exprSlotRefs.get(0));
}
refBaseTablePartitionSlotMap.put(table, slotRefs);
}
LOG.info("The refBaseTablePartitionSlotMap of mv {} is {}", getName(), refBaseTablePartitionSlotMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,7 @@ public void addRangePartitionKeys(String partitionName,

/**
* Add partition name that needs to be refreshed and its associated list partition key
* @param partitionName base table partition name
* @param listPartitionKey the associated list partition
*/
public void addListPartitionKeys(String partitionName,
PListCell listPartitionKey) {
nameToPartKeys.put(partitionName, listPartitionKey);
}

public void addListPartitionKeys(Map<String, PListCell> listPartitionKeys) {
nameToPartKeys.putAll(listPartitionKeys);
}
Expand All @@ -107,7 +100,7 @@ public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
public Map<String, PListCell> getPartitionNameWithLists() {
Map<String, PListCell> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : nameToPartKeys.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PRangeCell);
Preconditions.checkState(e.getValue() instanceof PListCell);
PListCell listCell = (PListCell) e.getValue();
result.put(e.getKey(), listCell);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected Map<Table, Set<String>> collectBaseTableUpdatePartitionNames(Map<Table
MvUpdateInfo mvUpdateInfo) {
Map<Table, Set<String>> baseChangedPartitionNames = Maps.newHashMap();
for (Table baseTable : refBaseTableAndColumns.keySet()) {
// TODO:
MvBaseTableUpdateInfo mvBaseTableUpdateInfo = getMvBaseTableUpdateInfo(mv, baseTable,
true, isQueryRewrite);
mvUpdateInfo.getBaseTableUpdateInfos().put(baseTable, mvBaseTableUpdateInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public String getTableName() {

public abstract String getDbName();

@Deprecated
public abstract PartitionKey createPartitionKeyWithType(List<String> values, List<Type> types) throws AnalysisException;

public abstract PartitionKey createPartitionKey(List<String> partitionValues, List<Column> partitionColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,15 @@ public enum DateTimeInterval {

public static final String MYSQL_PARTITION_MAXVALUE = "MAXVALUE";

@Deprecated
public static PartitionKey createPartitionKey(List<String> values, List<Column> columns) throws AnalysisException {
return createPartitionKey(values, columns, Table.TableType.HIVE);
}

/**
* Use createPartitionKey instead.
*/
@Deprecated
public static PartitionKey createPartitionKeyWithType(List<String> values, List<Type> types,
Table.TableType tableType) throws AnalysisException {
return ConnectorPartitionTraits.build(tableType).createPartitionKeyWithType(values, types);
Expand Down Expand Up @@ -406,7 +411,7 @@ public static Map<String, Set<String>> getMVPartitionNameMapOfExternalTable(Tabl
.map(p -> getPartitionNameForJDBCTable(partitionColumns.get(p), partitionName))
.collect(Collectors.toList()),
partitionColumnIdxes.stream().map(partitionColumns::get).collect(Collectors.toList()),
table.getType());
table);
String mvPartitionName = generateMVPartitionName(partitionKey);
mvPartitionKeySetMap.computeIfAbsent(mvPartitionName, x -> Sets.newHashSet())
.add(partitionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
import static com.starrocks.catalog.Table.TableType.ICEBERG;
import static com.starrocks.common.profile.Tracers.Module.EXTERNAL;
import static com.starrocks.connector.ColumnTypeConverter.fromIcebergType;
import static com.starrocks.connector.PartitionUtil.createPartitionKeyWithType;
import static com.starrocks.connector.PartitionUtil.createPartitionKey;
import static com.starrocks.connector.iceberg.IcebergApiConverter.filterManifests;
import static com.starrocks.connector.iceberg.IcebergApiConverter.mayHaveEqualityDeletes;
import static com.starrocks.connector.iceberg.IcebergApiConverter.parsePartitionFields;
Expand Down Expand Up @@ -772,7 +772,7 @@ public List<PartitionKey> getPrunedPartitions(Table table, ScalarOperator predic
.collect(Collectors.toList());
}

partitionKeys.add(createPartitionKeyWithType(values, srTypes, table.getType()));
partitionKeys.add(createPartitionKey(values, partitionColumns, table));
} catch (Exception e) {
LOG.error("create partition key failed.", e);
throw new StarRocksConnectorException(e.getMessage());
Expand Down
Loading
Loading