From e417db19ca63d680daf6765f313f52a776728efc Mon Sep 17 00:00:00 2001 From: David Phillips Date: Wed, 23 Sep 2020 23:26:36 -0700 Subject: [PATCH] Eagerly resolve snapshots for Iceberg tables --- .../prestosql/plugin/iceberg/FilesTable.java | 15 ++-- .../plugin/iceberg/IcebergMetadata.java | 69 ++++++++++++++----- .../plugin/iceberg/IcebergSplitManager.java | 12 +++- .../plugin/iceberg/IcebergTableHandle.java | 16 +---- .../plugin/iceberg/IcebergTableName.java | 11 +++ .../prestosql/plugin/iceberg/IcebergUtil.java | 37 +++++----- .../plugin/iceberg/ManifestsTable.java | 18 ++--- .../plugin/iceberg/PartitionTable.java | 17 +++-- .../plugin/iceberg/TableStatisticsMaker.java | 10 ++- 9 files changed, 129 insertions(+), 76 deletions(-) diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/FilesTable.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/FilesTable.java index b452b4a0e834c..c6824e58b9ebd 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/FilesTable.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/FilesTable.java @@ -44,7 +44,6 @@ import java.util.Optional; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan; import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.IntegerType.INTEGER; import static io.prestosql.spi.type.TypeSignature.mapType; @@ -59,7 +58,7 @@ public class FilesTable private final Table icebergTable; private final Optional snapshotId; - public FilesTable(SchemaTableName tableName, Table icebergTable, Optional snapshotId, TypeManager typeManager) + public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional snapshotId) { this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); @@ -95,15 +94,21 @@ public ConnectorTableMetadata getTableMetadata() @Override public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { - return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId)); + if (snapshotId.isEmpty()) { + return new FixedPageSource(ImmutableList.of()); + } + return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId.get())); } - private static List buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional snapshotId) + private static List buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, long snapshotId) { PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); - TableScan tableScan = getTableScan(TupleDomain.all(), icebergTable, snapshotId).includeColumnStats(); Map idToTypeMapping = getIcebergIdToTypeMapping(icebergTable.schema()); + TableScan tableScan = icebergTable.newScan() + .useSnapshot(snapshotId) + .includeColumnStats(); + tableScan.planFiles().forEach(fileScanTask -> { DataFile dataFile = fileScanTask.file(); diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java index 3380acdf92868..94383924e7962 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java @@ -64,6 +64,7 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; @@ -84,6 +85,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -174,42 +176,64 @@ public Optional getSchemaOwner(ConnectorSession session, Catalo @Override public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { - IcebergTableHandle handle = IcebergTableHandle.from(tableName); - Optional table = metastore.getTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName()); - if (table.isEmpty()) { + IcebergTableName name = IcebergTableName.from(tableName.getTableName()); + verify(name.getTableType() == DATA, "Wrong table type: " + name.getTableType()); + + Optional
hiveTable = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), name.getTableName()); + if (hiveTable.isEmpty()) { return null; } - if (handle.getTableType() != DATA) { - throw new PrestoException(NOT_SUPPORTED, "Table type not yet supported: " + handle.getSchemaTableNameWithType()); - } - if (!isIcebergTable(table.get())) { + if (!isIcebergTable(hiveTable.get())) { throw new UnknownTableTypeException(tableName); } - return handle; + + org.apache.iceberg.Table table = getIcebergTable(metastore, hdfsEnvironment, session, hiveTable.get().getSchemaTableName()); + Optional snapshotId = getSnapshotId(table, name.getSnapshotId()); + + return new IcebergTableHandle( + tableName.getSchemaName(), + name.getTableName(), + name.getTableType(), + snapshotId, + TupleDomain.all()); } @Override public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) { - IcebergTableHandle table = IcebergTableHandle.from(tableName); - return getRawSystemTable(session, table) + return getRawSystemTable(session, tableName) .map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader())); } - private Optional getRawSystemTable(ConnectorSession session, IcebergTableHandle table) + private Optional getRawSystemTable(ConnectorSession session, SchemaTableName tableName) { - org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName()); - switch (table.getTableType()) { - case PARTITIONS: - return Optional.of(new PartitionTable(table, typeManager, icebergTable)); + IcebergTableName name = IcebergTableName.from(tableName.getTableName()); + + Optional
hiveTable = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), name.getTableName()); + if (hiveTable.isEmpty() || !isIcebergTable(hiveTable.get())) { + return Optional.empty(); + } + + org.apache.iceberg.Table table = getIcebergTable(metastore, hdfsEnvironment, session, hiveTable.get().getSchemaTableName()); + + SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType()); + switch (name.getTableType()) { case HISTORY: - return Optional.of(new HistoryTable(table.getSchemaTableNameWithType(), icebergTable)); + if (name.getSnapshotId().isPresent()) { + throw new PrestoException(NOT_SUPPORTED, "Snapshot ID not supported for history table: " + systemTableName); + } + return Optional.of(new HistoryTable(systemTableName, table)); case SNAPSHOTS: - return Optional.of(new SnapshotsTable(table.getSchemaTableNameWithType(), typeManager, icebergTable)); + if (name.getSnapshotId().isPresent()) { + throw new PrestoException(NOT_SUPPORTED, "Snapshot ID not supported for snapshots table: " + systemTableName); + } + return Optional.of(new SnapshotsTable(systemTableName, typeManager, table)); + case PARTITIONS: + return Optional.of(new PartitionTable(systemTableName, typeManager, table, getSnapshotId(table, name.getSnapshotId()))); case MANIFESTS: - return Optional.of(new ManifestsTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId())); + return Optional.of(new ManifestsTable(systemTableName, table, getSnapshotId(table, name.getSnapshotId()))); case FILES: - return Optional.of(new FilesTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId(), typeManager)); + return Optional.of(new FilesTable(systemTableName, typeManager, table, getSnapshotId(table, name.getSnapshotId()))); } return Optional.empty(); } @@ -662,4 +686,11 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, handle.getSchemaTableName()); return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable); } + + private static Optional getSnapshotId(org.apache.iceberg.Table table, Optional snapshotId) + { + return snapshotId + .map(id -> IcebergUtil.resolveSnapshotId(table, id)) + .or(() -> Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId)); + } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java index dc5d9ea725c5c..8f807008178f8 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java @@ -13,6 +13,7 @@ */ package io.prestosql.plugin.iceberg; +import com.google.common.collect.ImmutableList; import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.prestosql.plugin.hive.HdfsEnvironment; import io.prestosql.plugin.hive.metastore.HiveMetastore; @@ -22,13 +23,14 @@ import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.ConnectorTransactionHandle; import io.prestosql.spi.connector.DynamicFilter; +import io.prestosql.spi.connector.FixedSplitSource; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import javax.inject.Inject; +import static io.prestosql.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.prestosql.plugin.iceberg.IcebergUtil.getIcebergTable; -import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan; import static java.util.Objects.requireNonNull; public class IcebergSplitManager @@ -54,10 +56,16 @@ public ConnectorSplitSource getSplits( { IcebergTableHandle table = (IcebergTableHandle) handle; + if (table.getSnapshotId().isEmpty()) { + return new FixedSplitSource(ImmutableList.of()); + } + HiveMetastore metastore = transactionManager.get(transaction).getMetastore(); Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName()); - TableScan tableScan = getTableScan(table.getPredicate(), icebergTable, table.getSnapshotId()); + TableScan tableScan = icebergTable.newScan() + .filter(toIcebergExpression(table.getPredicate())) + .useSnapshot(table.getSnapshotId().get()); // TODO Use residual. Right now there is no way to propagate residual to presto but at least we can // propagate it at split level so the parquet pushdown can leverage it. diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java index 0a20c4803366b..a3b0e47fad7cb 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java @@ -19,6 +19,7 @@ import io.prestosql.spi.connector.SchemaTableName; import io.prestosql.spi.predicate.TupleDomain; +import java.util.Locale; import java.util.Objects; import java.util.Optional; @@ -85,7 +86,7 @@ public SchemaTableName getSchemaTableName() public SchemaTableName getSchemaTableNameWithType() { - return new SchemaTableName(schemaName, tableName + "$" + tableType.name()); + return new SchemaTableName(schemaName, tableName + "$" + tableType.name().toLowerCase(Locale.ROOT)); } @Override @@ -115,17 +116,6 @@ public int hashCode() @Override public String toString() { - return getSchemaTableName().toString(); - } - - public static IcebergTableHandle from(SchemaTableName table) - { - IcebergTableName name = IcebergTableName.from(table.getTableName()); - return new IcebergTableHandle( - table.getSchemaName(), - name.getTableName(), - name.getTableType(), - name.getSnapshotId(), - TupleDomain.all()); + return getSchemaTableNameWithType() + "@" + snapshotId; } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableName.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableName.java index 54b389f8619b8..f0c13d1e259f6 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableName.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableName.java @@ -58,6 +58,17 @@ public Optional getSnapshotId() return snapshotId; } + public String getTableNameWithType() + { + return tableName + "$" + tableType.name().toLowerCase(Locale.ROOT); + } + + @Override + public String toString() + { + return getTableNameWithType() + "@" + snapshotId; + } + public static IcebergTableName from(String name) { Matcher match = TABLE_PATTERN.matcher(name); diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java index a0c48ded5f578..e7fbec1644be9 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java @@ -18,19 +18,18 @@ import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext; import io.prestosql.plugin.hive.authentication.HiveIdentity; import io.prestosql.plugin.hive.metastore.HiveMetastore; +import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.ConnectorSession; import io.prestosql.spi.connector.SchemaTableName; -import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.spi.type.TypeManager; import org.apache.iceberg.BaseTable; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.expressions.Expression; import java.util.List; import java.util.Locale; @@ -39,9 +38,11 @@ import java.util.regex.Pattern; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.Streams.stream; +import static com.google.common.collect.Lists.reverse; import static io.prestosql.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType; +import static java.lang.String.format; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; @@ -66,6 +67,19 @@ public static Table getIcebergTable(HiveMetastore metastore, HdfsEnvironment hdf return new BaseTable(operations, quotedTableName(table)); } + public static long resolveSnapshotId(Table table, long snapshotId) + { + if (table.snapshot(snapshotId) != null) { + return snapshotId; + } + + return reverse(table.history()).stream() + .filter(entry -> entry.timestampMillis() <= snapshotId) + .map(HistoryEntry::snapshotId) + .findFirst() + .orElseThrow(() -> new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, format("Invalid snapshot [%s] for table: %s", snapshotId, table))); + } + public static List getColumns(Schema schema, TypeManager typeManager) { return schema.columns().stream() @@ -110,21 +124,6 @@ public static Optional getTableComment(Table table) return Optional.ofNullable(table.properties().get(TABLE_COMMENT)); } - public static TableScan getTableScan(TupleDomain predicates, Table icebergTable, Optional snapshotId) - { - Expression expression = ExpressionConverter.toIcebergExpression(predicates); - TableScan tableScan = icebergTable.newScan().filter(expression); - return snapshotId - .map(id -> isSnapshot(icebergTable, id) ? tableScan.useSnapshot(id) : tableScan.asOfTime(id)) - .orElse(tableScan); - } - - private static boolean isSnapshot(Table icebergTable, Long id) - { - return stream(icebergTable.snapshots()) - .anyMatch(snapshot -> snapshot.snapshotId() == id); - } - private static String quotedTableName(SchemaTableName name) { return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName()); diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ManifestsTable.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ManifestsTable.java index e70bd3787bfe7..8debd072cc51e 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ManifestsTable.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ManifestsTable.java @@ -42,11 +42,11 @@ import java.util.Optional; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; -import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.BooleanType.BOOLEAN; import static io.prestosql.spi.type.IntegerType.INTEGER; import static io.prestosql.spi.type.VarcharType.VARCHAR; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class ManifestsTable @@ -93,21 +93,21 @@ public ConnectorTableMetadata getTableMetadata() @Override public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { - return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId)); + if (snapshotId.isEmpty()) { + return new FixedPageSource(ImmutableList.of()); + } + return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId.get())); } - private static List buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional snapshotId) + private static List buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, long snapshotId) { PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); - Snapshot snapshot = snapshotId.map(icebergTable::snapshot) - .orElseGet(icebergTable::currentSnapshot); + Snapshot snapshot = icebergTable.snapshot(snapshotId); if (snapshot == null) { - if (snapshotId.isPresent()) { - throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, "Invalid snapshot ID: " + snapshotId.get()); - } - throw new PrestoException(ICEBERG_INVALID_METADATA, "There's no snapshot associated with table " + tableMetadata.getTable().toString()); + throw new PrestoException(ICEBERG_INVALID_METADATA, format("Snapshot ID [%s] does not exist for table: %s", snapshotId, icebergTable)); } + Map partitionSpecsById = icebergTable.specs(); snapshot.allManifests().forEach(file -> { diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java index 5a4d33e6d4cf4..0ed8b447922ec 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java @@ -22,6 +22,7 @@ import io.prestosql.spi.connector.ConnectorTransactionHandle; import io.prestosql.spi.connector.InMemoryRecordSet; import io.prestosql.spi.connector.RecordCursor; +import io.prestosql.spi.connector.SchemaTableName; import io.prestosql.spi.connector.SystemTable; import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.spi.type.RowType; @@ -53,7 +54,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.prestosql.plugin.iceberg.IcebergUtil.getIdentityPartitions; -import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan; import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType; import static io.prestosql.plugin.iceberg.util.Timestamps.timestampTzFromMicros; import static io.prestosql.spi.type.BigintType.BIGINT; @@ -64,9 +64,9 @@ public class PartitionTable implements SystemTable { - private final IcebergTableHandle tableHandle; private final TypeManager typeManager; private final Table icebergTable; + private final Optional snapshotId; private final Map idToTypeMapping; private final List nonPartitionPrimitiveColumns; private final List partitionColumnTypes; @@ -74,11 +74,11 @@ public class PartitionTable private final List columnMetricTypes; private final ConnectorTableMetadata connectorTableMetadata; - public PartitionTable(IcebergTableHandle tableHandle, TypeManager typeManager, Table icebergTable) + public PartitionTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional snapshotId) { - this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); + this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); this.idToTypeMapping = icebergTable.schema().columns().stream() .filter(column -> column.type().isPrimitiveType()) .collect(Collectors.toMap(Types.NestedField::fieldId, (column) -> column.type().asPrimitiveType())); @@ -114,7 +114,7 @@ public PartitionTable(IcebergTableHandle tableHandle, TypeManager typeManager, T this.resultTypes = columnMetadata.stream() .map(ColumnMetadata::getType) .collect(toImmutableList()); - this.connectorTableMetadata = new ConnectorTableMetadata(tableHandle.getSchemaTableNameWithType(), columnMetadata); + this.connectorTableMetadata = new ConnectorTableMetadata(tableName, columnMetadata); } @Override @@ -152,7 +152,12 @@ private List getColumnMetadata(List columns) public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { // TODO instead of cursor use pageSource method. - TableScan tableScan = getTableScan(TupleDomain.all(), icebergTable, tableHandle.getSnapshotId()).includeColumnStats(); + if (snapshotId.isEmpty()) { + return new InMemoryRecordSet(resultTypes, ImmutableList.of()).cursor(); + } + TableScan tableScan = icebergTable.newScan() + .useSnapshot(snapshotId.get()) + .includeColumnStats(); return buildRecordCursor(getPartitions(tableScan), icebergTable.spec().fields()); } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableStatisticsMaker.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableStatisticsMaker.java index 81fd5db1d568d..137397f895099 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableStatisticsMaker.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableStatisticsMaker.java @@ -46,9 +46,9 @@ import java.util.stream.Collectors; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.prestosql.plugin.iceberg.IcebergUtil.getColumns; import static io.prestosql.plugin.iceberg.IcebergUtil.getIdentityPartitions; -import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan; import static io.prestosql.plugin.iceberg.Partition.toMap; import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType; import static java.util.Objects.requireNonNull; @@ -74,7 +74,7 @@ public static TableStatistics getTableStatistics(TypeManager typeManager, Constr private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Constraint constraint) { - if (constraint.getSummary().isNone()) { + if (tableHandle.getSnapshotId().isEmpty() || constraint.getSummary().isNone()) { return TableStatistics.empty(); } @@ -119,7 +119,11 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons } Map idToDetails = idToDetailsBuilder.build(); - TableScan tableScan = getTableScan(intersection, icebergTable, tableHandle.getSnapshotId()).includeColumnStats(); + TableScan tableScan = icebergTable.newScan() + .filter(toIcebergExpression(intersection)) + .useSnapshot(tableHandle.getSnapshotId().get()) + .includeColumnStats(); + Partition summary = null; try (CloseableIterable fileScanTasks = tableScan.planFiles()) { for (FileScanTask fileScanTask : fileScanTasks) {