Skip to content

Commit

Permalink
Add support for Iceberg object store
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelong95 committed Aug 20, 2024
1 parent 5583b5f commit ed46a94
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class IcebergConfig
private int splitManagerThreads = Runtime.getRuntime().availableProcessors() * 2;
private boolean incrementalRefreshEnabled = true;
private boolean metadataCacheEnabled = true;
private boolean objectStoreEnabled;
private Optional<String> dataLocation = Optional.empty();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -483,4 +485,30 @@ public IcebergConfig setMetadataCacheEnabled(boolean metadataCacheEnabled)
this.metadataCacheEnabled = metadataCacheEnabled;
return this;
}

@Config("iceberg.object-store.enabled")
@ConfigDescription("Enable the Iceberg object store file layout")
public IcebergConfig setObjectStoreEnabled(boolean objectStoreEnabled)
{
this.objectStoreEnabled = objectStoreEnabled;
return this;
}

public boolean isObjectStoreEnabled()
{
return objectStoreEnabled;
}

@Config("iceberg.data-location")
@ConfigDescription("Path for data files")
public IcebergConfig setDataLocation(String dataLocation)
{
this.dataLocation = Optional.ofNullable(dataLocation);
return this;
}

public Optional<String> getDataLocation()
{
return dataLocation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public final class IcebergSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled";
private static final String OBJECT_STORE_ENABLED = "object_store_enabled";
private static final String DATA_LOCATION = "data_location";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -386,6 +388,16 @@ public IcebergSessionProperties(
"Enable Incremental refresh for MVs backed by Iceberg tables, when possible.",
icebergConfig.isIncrementalRefreshEnabled(),
false))
.add(booleanProperty(
OBJECT_STORE_ENABLED,
"Enable Iceberg object store file layout",
icebergConfig.isObjectStoreEnabled(),
false))
.add(stringProperty(
DATA_LOCATION,
"Location for data files",
icebergConfig.getDataLocation().orElse(null),
false))
.build();
}

Expand Down Expand Up @@ -624,4 +636,14 @@ public static boolean isIncrementalRefreshEnabled(ConnectorSession session)
{
return session.getProperty(INCREMENTAL_REFRESH_ENABLED, Boolean.class);
}

public static boolean isObjectStoreEnabled(ConnectorSession session)
{
return session.getProperty(OBJECT_STORE_ENABLED, Boolean.class);
}

public static Optional<String> getDataLocation(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(DATA_LOCATION, String.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
Expand All @@ -46,6 +47,8 @@ public class IcebergTableProperties
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
public static final String OBJECT_STORE_ENABLED_PROPERTY = "object_store_enabled";
public static final String DATA_LOCATION_PROPERTY = "data_location";

private final List<PropertyMetadata<?>> tableProperties;

Expand Down Expand Up @@ -120,6 +123,16 @@ public IcebergTableProperties(
.map(name -> name.toLowerCase(ENGLISH))
.collect(toImmutableList()),
value -> value))
.add(booleanProperty(
OBJECT_STORE_ENABLED_PROPERTY,
"Set to true to enable Iceberg object store file layout. Default false.",
icebergConfig.isObjectStoreEnabled(),
false))
.add(stringProperty(
DATA_LOCATION_PROPERTY,
"File system location URI for the table's data files",
icebergConfig.getDataLocation().orElse(null),
false))
.build();
}

Expand Down Expand Up @@ -188,4 +201,14 @@ public static List<String> getParquetBloomFilterColumns(Map<String, Object> tabl
List<String> parquetBloomFilterColumns = (List<String>) tableProperties.get(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY);
return parquetBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(parquetBloomFilterColumns);
}

public static boolean getObjectStoreEnabled(Map<String, Object> tableProperties)
{
return (Boolean) tableProperties.get(OBJECT_STORE_ENABLED_PROPERTY);
}

public static Optional<String> getDataLocation(Map<String, Object> tableProperties)
{
return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,11 @@
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;
import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash;
Expand Down Expand Up @@ -793,6 +791,10 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString());
propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties())));
propertiesBuilder.put(OBJECT_STORE_ENABLED, Boolean.toString(IcebergTableProperties.getObjectStoreEnabled(tableMetadata.getProperties())));

Optional<String> dataLocation = IcebergTableProperties.getDataLocation(tableMetadata.getProperties());
dataLocation.ifPresent(s -> propertiesBuilder.put(WRITE_DATA_LOCATION, s));

// iceberg ORC format bloom filter properties used by create table
List<String> orcBloomFilterColumns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties());
Expand Down Expand Up @@ -885,17 +887,6 @@ public static long getSnapshotIdAsOfTime(Table table, long epochMillis)
.snapshotId();
}

public static void validateTableCanBeDropped(Table table)
{
// TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861
if (table.properties().containsKey(OBJECT_STORE_PATH) ||
table.properties().containsKey("write.folder-storage.path") || // Removed from Iceberg as of 0.14.0, but preserved for backward compatibility
table.properties().containsKey(WRITE_METADATA_LOCATION) ||
table.properties().containsKey(WRITE_DATA_LOCATION)) {
throw new TrinoException(NOT_SUPPORTED, "Table contains Iceberg path override properties and cannot be dropped from Trino: " + table.name());
}
}

private static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName)
{
if (actualStorageFormat != expectedStorageFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput;
Expand Down Expand Up @@ -669,7 +668,6 @@ private Optional<List<ColumnMetadata>> getCachedColumnMetadata(com.amazonaws.ser
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
try {
deleteTable(schemaTableName.getSchemaName(), schemaTableName.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT;
Expand Down Expand Up @@ -385,7 +384,6 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
TableMetadata metadata = table.operations().current();
validateTableCanBeDropped(table);

io.trino.metastore.Table metastoreTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -302,7 +301,6 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);

jdbcCatalog.dropTable(toIdentifier(schemaTableName), false);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
Expand Down Expand Up @@ -232,8 +231,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> tryGetColumnMetadata(Connector
@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
loadTable(session, schemaTableName);
nessieClient.dropTable(toIdentifier(schemaTableName), true);
// The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data.
// Nessie GC tool can be used to clean up the expired data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5046,7 +5046,8 @@ protected void verifyIcebergTableProperties(MaterializedResult actual)
assertThat(actual).isNotNull();
MaterializedResult expected = resultBuilder(getSession())
.row("write.format.default", format.name())
.row("write.parquet.compression-codec", "zstd").build();
.row("write.parquet.compression-codec", "zstd")
.row("write.object-storage.enabled", "false").build();
assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ public void testDefaults()
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of())
.setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2)
.setIncrementalRefreshEnabled(true)
.setMetadataCacheEnabled(true));
.setMetadataCacheEnabled(true)
.setIncrementalRefreshEnabled(true)
.setObjectStoreEnabled(false)
.setDataLocation(null));
}

@Test
Expand Down Expand Up @@ -106,6 +109,8 @@ public void testExplicitPropertyMappings()
.put("iceberg.split-manager-threads", "42")
.put("iceberg.incremental-refresh-enabled", "false")
.put("iceberg.metadata-cache.enabled", "false")
.put("iceberg.object-store.enabled", "true")
.put("iceberg.data-location", "data_location")
.buildOrThrow();

IcebergConfig expected = new IcebergConfig()
Expand Down Expand Up @@ -136,7 +141,10 @@ public void testExplicitPropertyMappings()
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver"))
.setSplitManagerThreads(42)
.setIncrementalRefreshEnabled(false)
.setMetadataCacheEnabled(false);
.setMetadataCacheEnabled(false)
.setIncrementalRefreshEnabled(false)
.setObjectStoreEnabled(true)
.setDataLocation("data_location");

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit ed46a94

Please sign in to comment.