Skip to content

Commit

Permalink
Eagerly resolve snapshots for Iceberg tables
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Oct 6, 2020
1 parent 358a953 commit e417db1
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Optional;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.TypeSignature.mapType;
Expand All @@ -59,7 +58,7 @@ public class FilesTable
private final Table icebergTable;
private final Optional<Long> snapshotId;

public FilesTable(SchemaTableName tableName, Table icebergTable, Optional<Long> snapshotId, TypeManager typeManager)
public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional<Long> snapshotId)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

Expand Down Expand Up @@ -95,15 +94,21 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId));
if (snapshotId.isEmpty()) {
return new FixedPageSource(ImmutableList.of());
}
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId.get()));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional<Long> snapshotId)
private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, long snapshotId)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
TableScan tableScan = getTableScan(TupleDomain.all(), icebergTable, snapshotId).includeColumnStats();
Map<Integer, Type> idToTypeMapping = getIcebergIdToTypeMapping(icebergTable.schema());

TableScan tableScan = icebergTable.newScan()
.useSnapshot(snapshotId)
.includeColumnStats();

tableScan.planFiles().forEach(fileScanTask -> {
DataFile dataFile = fileScanTask.file();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
Expand All @@ -84,6 +85,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand Down Expand Up @@ -174,42 +176,64 @@ public Optional<PrestoPrincipal> getSchemaOwner(ConnectorSession session, Catalo
@Override
public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
IcebergTableHandle handle = IcebergTableHandle.from(tableName);
Optional<Table> table = metastore.getTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName());
if (table.isEmpty()) {
IcebergTableName name = IcebergTableName.from(tableName.getTableName());
verify(name.getTableType() == DATA, "Wrong table type: " + name.getTableType());

Optional<Table> hiveTable = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), name.getTableName());
if (hiveTable.isEmpty()) {
return null;
}
if (handle.getTableType() != DATA) {
throw new PrestoException(NOT_SUPPORTED, "Table type not yet supported: " + handle.getSchemaTableNameWithType());
}
if (!isIcebergTable(table.get())) {
if (!isIcebergTable(hiveTable.get())) {
throw new UnknownTableTypeException(tableName);
}
return handle;

org.apache.iceberg.Table table = getIcebergTable(metastore, hdfsEnvironment, session, hiveTable.get().getSchemaTableName());
Optional<Long> snapshotId = getSnapshotId(table, name.getSnapshotId());

return new IcebergTableHandle(
tableName.getSchemaName(),
name.getTableName(),
name.getTableType(),
snapshotId,
TupleDomain.all());
}

@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
IcebergTableHandle table = IcebergTableHandle.from(tableName);
return getRawSystemTable(session, table)
return getRawSystemTable(session, tableName)
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

private Optional<SystemTable> getRawSystemTable(ConnectorSession session, IcebergTableHandle table)
private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName tableName)
{
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());
switch (table.getTableType()) {
case PARTITIONS:
return Optional.of(new PartitionTable(table, typeManager, icebergTable));
IcebergTableName name = IcebergTableName.from(tableName.getTableName());

Optional<Table> hiveTable = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), name.getTableName());
if (hiveTable.isEmpty() || !isIcebergTable(hiveTable.get())) {
return Optional.empty();
}

org.apache.iceberg.Table table = getIcebergTable(metastore, hdfsEnvironment, session, hiveTable.get().getSchemaTableName());

SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType());
switch (name.getTableType()) {
case HISTORY:
return Optional.of(new HistoryTable(table.getSchemaTableNameWithType(), icebergTable));
if (name.getSnapshotId().isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Snapshot ID not supported for history table: " + systemTableName);
}
return Optional.of(new HistoryTable(systemTableName, table));
case SNAPSHOTS:
return Optional.of(new SnapshotsTable(table.getSchemaTableNameWithType(), typeManager, icebergTable));
if (name.getSnapshotId().isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Snapshot ID not supported for snapshots table: " + systemTableName);
}
return Optional.of(new SnapshotsTable(systemTableName, typeManager, table));
case PARTITIONS:
return Optional.of(new PartitionTable(systemTableName, typeManager, table, getSnapshotId(table, name.getSnapshotId())));
case MANIFESTS:
return Optional.of(new ManifestsTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId()));
return Optional.of(new ManifestsTable(systemTableName, table, getSnapshotId(table, name.getSnapshotId())));
case FILES:
return Optional.of(new FilesTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId(), typeManager));
return Optional.of(new FilesTable(systemTableName, typeManager, table, getSnapshotId(table, name.getSnapshotId())));
}
return Optional.empty();
}
Expand Down Expand Up @@ -662,4 +686,11 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable);
}

private static Optional<Long> getSnapshotId(org.apache.iceberg.Table table, Optional<Long> snapshotId)
{
return snapshotId
.map(id -> IcebergUtil.resolveSnapshotId(table, id))
.or(() -> Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.prestosql.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
Expand All @@ -22,13 +23,14 @@
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.connector.FixedSplitSource;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;

import javax.inject.Inject;

import static io.prestosql.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.prestosql.plugin.iceberg.IcebergUtil.getIcebergTable;
import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
Expand All @@ -54,10 +56,16 @@ public ConnectorSplitSource getSplits(
{
IcebergTableHandle table = (IcebergTableHandle) handle;

if (table.getSnapshotId().isEmpty()) {
return new FixedSplitSource(ImmutableList.of());
}

HiveMetastore metastore = transactionManager.get(transaction).getMetastore();
Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());

TableScan tableScan = getTableScan(table.getPredicate(), icebergTable, table.getSnapshotId());
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(table.getPredicate()))
.useSnapshot(table.getSnapshotId().get());

// TODO Use residual. Right now there is no way to propagate residual to presto but at least we can
// propagate it at split level so the parquet pushdown can leverage it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;

import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -85,7 +86,7 @@ public SchemaTableName getSchemaTableName()

public SchemaTableName getSchemaTableNameWithType()
{
return new SchemaTableName(schemaName, tableName + "$" + tableType.name());
return new SchemaTableName(schemaName, tableName + "$" + tableType.name().toLowerCase(Locale.ROOT));
}

@Override
Expand Down Expand Up @@ -115,17 +116,6 @@ public int hashCode()
@Override
public String toString()
{
return getSchemaTableName().toString();
}

public static IcebergTableHandle from(SchemaTableName table)
{
IcebergTableName name = IcebergTableName.from(table.getTableName());
return new IcebergTableHandle(
table.getSchemaName(),
name.getTableName(),
name.getTableType(),
name.getSnapshotId(),
TupleDomain.all());
return getSchemaTableNameWithType() + "@" + snapshotId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ public Optional<Long> getSnapshotId()
return snapshotId;
}

public String getTableNameWithType()
{
return tableName + "$" + tableType.name().toLowerCase(Locale.ROOT);
}

@Override
public String toString()
{
return getTableNameWithType() + "@" + snapshotId;
}

public static IcebergTableName from(String name)
{
Matcher match = TABLE_PATTERN.matcher(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.TypeManager;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;

import java.util.List;
import java.util.Locale;
Expand All @@ -39,9 +38,11 @@
import java.util.regex.Pattern;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Streams.stream;
import static com.google.common.collect.Lists.reverse;
import static io.prestosql.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType;
import static java.lang.String.format;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
Expand All @@ -66,6 +67,19 @@ public static Table getIcebergTable(HiveMetastore metastore, HdfsEnvironment hdf
return new BaseTable(operations, quotedTableName(table));
}

public static long resolveSnapshotId(Table table, long snapshotId)
{
if (table.snapshot(snapshotId) != null) {
return snapshotId;
}

return reverse(table.history()).stream()
.filter(entry -> entry.timestampMillis() <= snapshotId)
.map(HistoryEntry::snapshotId)
.findFirst()
.orElseThrow(() -> new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, format("Invalid snapshot [%s] for table: %s", snapshotId, table)));
}

public static List<IcebergColumnHandle> getColumns(Schema schema, TypeManager typeManager)
{
return schema.columns().stream()
Expand Down Expand Up @@ -110,21 +124,6 @@ public static Optional<String> getTableComment(Table table)
return Optional.ofNullable(table.properties().get(TABLE_COMMENT));
}

public static TableScan getTableScan(TupleDomain<IcebergColumnHandle> predicates, Table icebergTable, Optional<Long> snapshotId)
{
Expression expression = ExpressionConverter.toIcebergExpression(predicates);
TableScan tableScan = icebergTable.newScan().filter(expression);
return snapshotId
.map(id -> isSnapshot(icebergTable, id) ? tableScan.useSnapshot(id) : tableScan.asOfTime(id))
.orElse(tableScan);
}

private static boolean isSnapshot(Table icebergTable, Long id)
{
return stream(icebergTable.snapshots())
.anyMatch(snapshot -> snapshot.snapshotId() == id);
}

private static String quotedTableName(SchemaTableName name)
{
return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import java.util.Optional;

import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class ManifestsTable
Expand Down Expand Up @@ -93,21 +93,21 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId));
if (snapshotId.isEmpty()) {
return new FixedPageSource(ImmutableList.of());
}
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId.get()));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional<Long> snapshotId)
private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, long snapshotId)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

Snapshot snapshot = snapshotId.map(icebergTable::snapshot)
.orElseGet(icebergTable::currentSnapshot);
Snapshot snapshot = icebergTable.snapshot(snapshotId);
if (snapshot == null) {
if (snapshotId.isPresent()) {
throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, "Invalid snapshot ID: " + snapshotId.get());
}
throw new PrestoException(ICEBERG_INVALID_METADATA, "There's no snapshot associated with table " + tableMetadata.getTable().toString());
throw new PrestoException(ICEBERG_INVALID_METADATA, format("Snapshot ID [%s] does not exist for table: %s", snapshotId, icebergTable));
}

Map<Integer, PartitionSpec> partitionSpecsById = icebergTable.specs();

snapshot.allManifests().forEach(file -> {
Expand Down
Loading

0 comments on commit e417db1

Please sign in to comment.