Skip to content

Commit

Permalink
Cache Iceberg column types in Glue for faster access
Browse files Browse the repository at this point in the history
Make `information_schema.columns` queries faster by storing the
necessary information directly in Glue so that loading Iceberg metadata
from storage is not need.

- Table comment is stored as `comment` Glue table parameter.

- Column Trino type is stored as Glue column type (for some cases) or as
  `trino_type` column table parameter. This is because, following
  Iceberg own's `org.apache.iceberg.aws.glue.IcebergToGlueConverter.toTypeString`
  the column type written to Glue is not accurate, so this piece of
  information may be lossy. In such cases the column parameter is used
  to store the Trino type.
  - For more compatibility we could store Iceberg type string, but
    Iceberg lacks API to reconstruct Type from string (except for
    primitive types). This is not surprising, as it needs IDs for all
    fields. Something that is not needed to answer metadata queries.

- Column NOT NULL constraint is stored as `trino_not_null=true` column table
  parameter (omitted for nullable columns).

Before the above cached information is used, the following conditions
are checked

- the `trino_table_metadata_info_valid_for` table property must be set
  to current metadata location. This ensures that the cached information
  is invalided whenever metadata location changes.

- at least one column must have `trino_type` property set. This ensures
  the cached information is not used when column parameters were lost in
  transit or otherwise erased.

`iceberg.glue.cache-table-metadata` serves as a kill-switch for the new
functionality (both write & read parts).
  • Loading branch information
findepi committed Jul 31, 2023
1 parent ff22b8b commit 7c32725
Show file tree
Hide file tree
Showing 19 changed files with 447 additions and 46 deletions.
5 changes: 5 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@
<artifactId>jjwt-jackson</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-cache</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import io.airlift.json.JsonCodec;
Expand Down Expand Up @@ -171,7 +172,6 @@
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -300,6 +300,7 @@ public class IcebergMetadata
private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME);

private static final Integer DELETE_BATCH_SIZE = 1000;
public static final int GET_METADATA_BATCH_SIZE = 1000;

private final TypeManager typeManager;
private final CatalogHandle trinoCatalogHandle;
Expand Down Expand Up @@ -649,31 +650,49 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
else {
schemaTableNames = ImmutableList.of(prefix.toSchemaTableName());
}
return schemaTableNames.stream()
.flatMap(tableName -> {
try {

return Lists.partition(schemaTableNames, GET_METADATA_BATCH_SIZE).stream()
.map(tableBatch -> {
ImmutableList.Builder<TableColumnsMetadata> tableMetadatas = ImmutableList.builderWithExpectedSize(tableBatch.size());
Set<SchemaTableName> remainingTables = new HashSet<>(tableBatch.size());
for (SchemaTableName tableName : tableBatch) {
if (redirectTable(session, tableName).isPresent()) {
return Stream.of(TableColumnsMetadata.forRedirectedTable(tableName));
tableMetadatas.add(TableColumnsMetadata.forRedirectedTable(tableName));
}
else {
remainingTables.add(tableName);
}

Table icebergTable = catalog.loadTable(session, tableName);
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema());
return Stream.of(TableColumnsMetadata.forTable(tableName, columns));
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
return Stream.empty();
}
catch (UnknownTableTypeException e) {
// Skip unsupported table type in case that the table redirects are not enabled
return Stream.empty();
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during streaming table columns for %s", tableName, prefix);
return Stream.empty();

Map<SchemaTableName, List<ColumnMetadata>> loaded = catalog.tryGetColumnMetadata(session, ImmutableList.copyOf(remainingTables));
loaded.forEach((tableName, columns) -> {
remainingTables.remove(tableName);
tableMetadatas.add(TableColumnsMetadata.forTable(tableName, columns));
});

for (SchemaTableName tableName : remainingTables) {
try {
Table icebergTable = catalog.loadTable(session, tableName);
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema());
tableMetadatas.add(TableColumnsMetadata.forTable(tableName, columns));
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
continue;
}
catch (UnknownTableTypeException e) {
// Skip unsupported table type in case that the table redirects are not enabled
continue;
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during streaming table columns for %s", tableName, prefix);
continue;
}
}
return tableMetadatas.build();
})
.flatMap(List::stream)
.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@

public final class IcebergUtil
{
public static final String TRINO_TABLE_METADATA_INFO_VALID_FOR = "trino_table_metadata_info_valid_for";
public static final String COLUMN_TRINO_NOT_NULL_PROPERTY = "trino_not_null";
public static final String COLUMN_TRINO_TYPE_ID_PROPERTY = "trino_type_id";

public static final String METADATA_FOLDER_NAME = "metadata";
public static final String METADATA_FILE_EXTENSION = ".metadata.json";
private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.plugin.iceberg.ColumnIdentity;
import io.trino.plugin.iceberg.UnknownTableTypeException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorViewDefinition;
Expand Down Expand Up @@ -96,6 +97,11 @@ Transaction newCreateTableTransaction(
*/
Table loadTable(ConnectorSession session, SchemaTableName schemaTableName);

/**
* Bulk load column metadata. The returned map may contain fewer entries then asked for.
*/
Map<SchemaTableName, List<ColumnMetadata>> tryGetColumnMetadata(ConnectorSession session, List<SchemaTableName> tables);

void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional<String> comment);

void updateViewComment(ConnectorSession session, SchemaTableName schemaViewName, Optional<String> comment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.amazonaws.services.glue.model.ConcurrentModificationException;
import com.amazonaws.services.glue.model.CreateTableRequest;
import com.amazonaws.services.glue.model.EntityNotFoundException;
import com.amazonaws.services.glue.model.GetTableRequest;
import com.amazonaws.services.glue.model.InvalidInputException;
import com.amazonaws.services.glue.model.ResourceNumberLimitExceededException;
import com.amazonaws.services.glue.model.Table;
Expand All @@ -30,7 +29,9 @@
import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.type.TypeManager;
import jakarta.annotation.Nullable;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
Expand All @@ -56,15 +57,21 @@
public class GlueIcebergTableOperations
extends AbstractIcebergTableOperations
{
private final TypeManager typeManager;
private final boolean cacheTableMetadata;
private final AWSGlueAsync glueClient;
private final GlueMetastoreStats stats;
private final GetGlueTable getGlueTable;

@Nullable
private String glueVersionId;

protected GlueIcebergTableOperations(
TypeManager typeManager,
boolean cacheTableMetadata,
AWSGlueAsync glueClient,
GlueMetastoreStats stats,
GetGlueTable getGlueTable,
FileIO fileIo,
ConnectorSession session,
String database,
Expand All @@ -73,14 +80,17 @@ protected GlueIcebergTableOperations(
Optional<String> location)
{
super(fileIo, session, database, table, owner, location);
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.cacheTableMetadata = cacheTableMetadata;
this.glueClient = requireNonNull(glueClient, "glueClient is null");
this.stats = requireNonNull(stats, "stats is null");
this.getGlueTable = requireNonNull(getGlueTable, "getGlueTable is null");
}

@Override
protected String getRefreshedLocation(boolean invalidateCaches)
{
Table table = getTable();
Table table = getTable(invalidateCaches);
glueVersionId = table.getVersionId();

Map<String, String> parameters = getTableParameters(table);
Expand All @@ -104,7 +114,7 @@ protected void commitNewTable(TableMetadata metadata)
{
verify(version.isEmpty(), "commitNewTable called on a table which already exists");
String newMetadataLocation = writeNewMetadata(metadata, 0);
TableInput tableInput = getTableInput(tableName, owner, metadata, newMetadataLocation, ImmutableMap.of());
TableInput tableInput = getTableInput(typeManager, tableName, owner, metadata, newMetadataLocation, ImmutableMap.of(), cacheTableMetadata);

CreateTableRequest createTableRequest = new CreateTableRequest()
.withDatabaseName(database)
Expand All @@ -128,11 +138,13 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1);
TableInput tableInput = getTableInput(
typeManager,
tableName,
owner,
metadata,
newMetadataLocation,
ImmutableMap.of(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation));
ImmutableMap.of(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation),
cacheTableMetadata);

UpdateTableRequest updateTableRequest = new UpdateTableRequest()
.withDatabaseName(database)
Expand All @@ -157,16 +169,13 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
shouldRefresh = true;
}

private Table getTable()
private Table getTable(boolean invalidateCaches)
{
try {
GetTableRequest getTableRequest = new GetTableRequest()
.withDatabaseName(database)
.withName(tableName);
return stats.getGetTable().call(() -> glueClient.getTable(getTableRequest).getTable());
}
catch (EntityNotFoundException e) {
throw new TableNotFoundException(getSchemaTableName(), e);
}
return getGlueTable.get(new SchemaTableName(database, tableName), invalidateCaches);
}

public interface GetGlueTable
{
Table get(SchemaTableName tableName, boolean invalidateCaches);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.TypeManager;

import java.util.Optional;

Expand All @@ -30,16 +31,22 @@
public class GlueIcebergTableOperationsProvider
implements IcebergTableOperationsProvider
{
private final TypeManager typeManager;
private final boolean cacheTableMetadata;
private final TrinoFileSystemFactory fileSystemFactory;
private final AWSGlueAsync glueClient;
private final GlueMetastoreStats stats;

@Inject
public GlueIcebergTableOperationsProvider(
TypeManager typeManager,
IcebergGlueCatalogConfig catalogConfig,
TrinoFileSystemFactory fileSystemFactory,
GlueMetastoreStats stats,
AWSGlueAsync glueClient)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.cacheTableMetadata = catalogConfig.isCacheTableMetadata();
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.stats = requireNonNull(stats, "stats is null");
this.glueClient = requireNonNull(glueClient, "glueClient is null");
Expand All @@ -55,8 +62,13 @@ public IcebergTableOperations createTableOperations(
Optional<String> location)
{
return new GlueIcebergTableOperations(
typeManager,
cacheTableMetadata,
glueClient,
stats,
// Share Glue Table cache between Catalog and TableOperations so that, when doing metadata queries (e.g. information_schema.columns)
// the GetTableRequest is issued once per table.
((TrinoGlueCatalog) catalog)::getTable,
new ForwardingFileIo(fileSystemFactory.create(session)),
session,
database,
Expand Down
Loading

0 comments on commit 7c32725

Please sign in to comment.