Skip to content

Commit

Permalink
Add support for Iceberg object store
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelong95 authored and ebyhr committed Nov 7, 2024
1 parent ec2a985 commit 78224d6
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 32 deletions.
10 changes: 10 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ implementation is used:
- Set to `false` to disable in-memory caching of metadata files on the
coordinator. This cache is not used when `fs.cache.enabled` is set to true.
- `true`
* - `iceberg.object-store.enabled`
- Set to `true` to enable Iceberg's [object store file layout](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout).
Enabling the object store file layout appends a deterministic hash directly
after the data write path, configured via `iceberg.data-location`, to ensure
data files are equally distributed across multiple paths in a cloud storage
service to avoid request throttling.
- `false`
* - `iceberg.data-location`
- Sets the path that data files will be written to.
- table location + /data
* - `iceberg.expire-snapshots.min-retention`
- Minimal retention period for the
[`expire_snapshot` command](iceberg-expire-snapshots).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class IcebergConfig
private List<String> allowedExtraProperties = ImmutableList.of();
private boolean incrementalRefreshEnabled = true;
private boolean metadataCacheEnabled = true;
private boolean objectStoreEnabled;
private Optional<String> dataLocation = Optional.empty();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -519,4 +521,30 @@ public IcebergConfig setMetadataCacheEnabled(boolean metadataCacheEnabled)
this.metadataCacheEnabled = metadataCacheEnabled;
return this;
}

public boolean isObjectStoreEnabled()
{
return objectStoreEnabled;
}

@Config("iceberg.object-store.enabled")
@ConfigDescription("Enable the Iceberg object store file layout")
public IcebergConfig setObjectStoreEnabled(boolean objectStoreEnabled)
{
this.objectStoreEnabled = objectStoreEnabled;
return this;
}

public Optional<String> getDataLocation()
{
return dataLocation;
}

@Config("iceberg.data-location")
@ConfigDescription("Path for data files")
public IcebergConfig setDataLocation(String dataLocation)
{
this.dataLocation = Optional.ofNullable(dataLocation);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
Expand Down Expand Up @@ -357,6 +358,7 @@
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
import static org.apache.iceberg.types.TypeUtil.indexParents;
Expand All @@ -376,6 +378,7 @@ public class IcebergMetadata
.add(EXTRA_PROPERTIES_PROPERTY)
.add(FILE_FORMAT_PROPERTY)
.add(FORMAT_VERSION_PROPERTY)
.add(OBJECT_STORE_ENABLED_PROPERTY)
.add(PARTITIONING_PROPERTY)
.add(SORTED_BY_PROPERTY)
.build();
Expand Down Expand Up @@ -2153,6 +2156,12 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion));
}

if (properties.containsKey(OBJECT_STORE_ENABLED_PROPERTY)) {
boolean objectStoreEnabled = (boolean) properties.get(OBJECT_STORE_ENABLED_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The object_store_enabled property cannot be empty"));
updateProperties.set(OBJECT_STORE_ENABLED, Boolean.toString(objectStoreEnabled));
}

try {
updateProperties.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
Expand All @@ -58,6 +59,8 @@ public class IcebergTableProperties
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
public static final String OBJECT_STORE_ENABLED_PROPERTY = "object_store_enabled";
public static final String DATA_LOCATION_PROPERTY = "data_location";
public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties";

public static final Set<String> SUPPORTED_PROPERTIES = ImmutableSet.<String>builder()
Expand All @@ -68,6 +71,8 @@ public class IcebergTableProperties
.add(FORMAT_VERSION_PROPERTY)
.add(ORC_BLOOM_FILTER_COLUMNS_PROPERTY)
.add(ORC_BLOOM_FILTER_FPP_PROPERTY)
.add(OBJECT_STORE_ENABLED_PROPERTY)
.add(DATA_LOCATION_PROPERTY)
.add(EXTRA_PROPERTIES_PROPERTY)
.add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
.build();
Expand Down Expand Up @@ -175,6 +180,16 @@ public IcebergTableProperties(
.collect(toImmutableMap(entry -> entry.getKey().toLowerCase(ENGLISH), Map.Entry::getValue));
},
value -> value))
.add(booleanProperty(
OBJECT_STORE_ENABLED_PROPERTY,
"Set to true to enable Iceberg object store file layout",
icebergConfig.isObjectStoreEnabled(),
false))
.add(stringProperty(
DATA_LOCATION_PROPERTY,
"File system location URI for the table's data files",
icebergConfig.getDataLocation().orElse(null),
false))
.build();

checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream()
Expand Down Expand Up @@ -249,6 +264,16 @@ public static List<String> getParquetBloomFilterColumns(Map<String, Object> tabl
return parquetBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(parquetBloomFilterColumns);
}

public static boolean getObjectStoreEnabled(Map<String, Object> tableProperties)
{
return (Boolean) tableProperties.get(OBJECT_STORE_ENABLED_PROPERTY);
}

public static Optional<String> getDataLocation(Map<String, Object> tableProperties)
{
return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY));
}

public static Optional<Map<String, String>> getExtraProperties(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Map<String, String>) tableProperties.get(EXTRA_PROPERTIES_PROPERTY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY;
Expand Down Expand Up @@ -157,6 +158,7 @@
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
import static java.lang.Boolean.parseBoolean;
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.parseFloat;
Expand All @@ -171,13 +173,11 @@
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;
import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash;
Expand Down Expand Up @@ -332,6 +332,10 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
properties.put(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY, ImmutableList.copyOf(parquetBloomFilterColumns));
}

if (parseBoolean(icebergTable.properties().getOrDefault(OBJECT_STORE_ENABLED, "false"))) {
properties.put(OBJECT_STORE_ENABLED_PROPERTY, true);
}

return properties.buildOrThrow();
}

Expand Down Expand Up @@ -812,6 +816,10 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString());
propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties())));
propertiesBuilder.put(OBJECT_STORE_ENABLED, Boolean.toString(IcebergTableProperties.getObjectStoreEnabled(tableMetadata.getProperties())));

Optional<String> dataLocation = IcebergTableProperties.getDataLocation(tableMetadata.getProperties());
dataLocation.ifPresent(s -> propertiesBuilder.put(WRITE_DATA_LOCATION, s));

// iceberg ORC format bloom filter properties used by create table
List<String> orcBloomFilterColumns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties());
Expand Down Expand Up @@ -936,17 +944,6 @@ public static long getSnapshotIdAsOfTime(Table table, long epochMillis)
.snapshotId();
}

public static void validateTableCanBeDropped(Table table)
{
// TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861
if (table.properties().containsKey(OBJECT_STORE_PATH) ||
table.properties().containsKey("write.folder-storage.path") || // Removed from Iceberg as of 0.14.0, but preserved for backward compatibility
table.properties().containsKey(WRITE_METADATA_LOCATION) ||
table.properties().containsKey(WRITE_DATA_LOCATION)) {
throw new TrinoException(NOT_SUPPORTED, "Table contains Iceberg path override properties and cannot be dropped from Trino: " + table.name());
}
}

private static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName)
{
if (actualStorageFormat != expectedStorageFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput;
Expand Down Expand Up @@ -674,7 +673,6 @@ private Optional<List<ColumnMetadata>> getCachedColumnMetadata(com.amazonaws.ser
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
try {
deleteTable(schemaTableName.getSchemaName(), schemaTableName.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT;
Expand Down Expand Up @@ -385,7 +384,6 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
TableMetadata metadata = table.operations().current();
validateTableCanBeDropped(table);

io.trino.metastore.Table metastoreTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -319,7 +318,6 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);

jdbcCatalog.dropTable(toIdentifier(schemaTableName), false);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
Expand Down Expand Up @@ -232,8 +231,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> tryGetColumnMetadata(Connector
@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
loadTable(session, schemaTableName);
nessieClient.dropTable(toIdentifier(schemaTableName), true);
// The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data.
// Nessie GC tool can be used to clean up the expired data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5051,7 +5051,8 @@ protected void verifyIcebergTableProperties(MaterializedResult actual)
assertThat(actual).isNotNull();
MaterializedResult expected = resultBuilder(getSession())
.row("write.format.default", format.name())
.row("write.parquet.compression-codec", "zstd").build();
.row("write.parquet.compression-codec", "zstd")
.row("write.object-storage.enabled", "false").build();
assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows());
}

Expand Down Expand Up @@ -7475,6 +7476,10 @@ public void testAlterTableWithUnsupportedProperties()
"The following properties cannot be updated: location, orc_bloom_filter_fpp");
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format = 'ORC', orc_bloom_filter_columns = ARRAY['a']",
"The following properties cannot be updated: orc_bloom_filter_columns");
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES object_store_enabled = true",
"The following properties cannot be updated: object_store_enabled");
assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES data_location = '/data'",
"The following properties cannot be updated: data_location");

assertUpdate("DROP TABLE " + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ public void testDefaults()
.setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2)
.setAllowedExtraProperties(ImmutableList.of())
.setIncrementalRefreshEnabled(true)
.setMetadataCacheEnabled(true));
.setMetadataCacheEnabled(true)
.setIncrementalRefreshEnabled(true)
.setObjectStoreEnabled(false)
.setDataLocation(null));
}

@Test
Expand Down Expand Up @@ -111,6 +114,8 @@ public void testExplicitPropertyMappings()
.put("iceberg.allowed-extra-properties", "propX,propY")
.put("iceberg.incremental-refresh-enabled", "false")
.put("iceberg.metadata-cache.enabled", "false")
.put("iceberg.object-store.enabled", "true")
.put("iceberg.data-location", "data_location")
.buildOrThrow();

IcebergConfig expected = new IcebergConfig()
Expand Down Expand Up @@ -143,7 +148,10 @@ public void testExplicitPropertyMappings()
.setSplitManagerThreads(42)
.setAllowedExtraProperties(ImmutableList.of("propX", "propY"))
.setIncrementalRefreshEnabled(false)
.setMetadataCacheEnabled(false);
.setMetadataCacheEnabled(false)
.setIncrementalRefreshEnabled(false)
.setObjectStoreEnabled(true)
.setDataLocation("data_location");

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit 78224d6

Please sign in to comment.