From 7473dad3e7fd9072917fa62d8f5b5f81cc76afab Mon Sep 17 00:00:00 2001 From: Jake Long Date: Mon, 29 Jan 2024 17:34:18 -0800 Subject: [PATCH] Add support for Iceberg object store --- docs/src/main/sphinx/connector/iceberg.md | 10 ++ .../trino/plugin/iceberg/IcebergConfig.java | 28 ++++ .../iceberg/IcebergSessionProperties.java | 22 +++ .../iceberg/IcebergTableProperties.java | 25 ++++ .../io/trino/plugin/iceberg/IcebergUtil.java | 17 +-- .../catalog/glue/TrinoGlueCatalog.java | 2 - .../iceberg/catalog/hms/TrinoHiveCatalog.java | 2 - .../catalog/jdbc/TrinoJdbcCatalog.java | 2 - .../catalog/nessie/TrinoNessieCatalog.java | 4 +- .../iceberg/BaseIcebergConnectorTest.java | 7 +- .../plugin/iceberg/TestIcebergConfig.java | 12 +- .../TestIcebergTableWithObjectStore.java | 132 ++++++++++++++++++ .../TestIcebergSparkCompatibility.java | 36 ++++- 13 files changed, 267 insertions(+), 32 deletions(-) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithObjectStore.java diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 240f5ee3a3ed2b..e923338c1a6772 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -197,6 +197,16 @@ implementation is used: - Set to `false` to disable in-memory caching of metadata files on the coordinator. This cache is not used when `fs.cache.enabled` is set to true. - `true` +* - `iceberg.object-store.enabled` + - Set to `true` to enable Iceberg's [object store file layout](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout). + Enabling the object store file layout appends a deterministic hash directly + after the data write path, configured via `iceberg.data-location`, to ensure + data files are equally distributed across multiple paths in a cloud storage + service to avoid request throttling. + - `false` +* - `iceberg.data-location` + - Sets the path that data files will be written to. + - table location + /data * - `iceberg.expire-snapshots.min-retention` - Minimal retention period for the [`expire_snapshot` command](iceberg-expire-snapshots). diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index af71315d2e899d..f7aa82dfeb2f70 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -91,6 +91,8 @@ public class IcebergConfig private List allowedExtraProperties = ImmutableList.of(); private boolean incrementalRefreshEnabled = true; private boolean metadataCacheEnabled = true; + private boolean objectStoreEnabled; + private Optional dataLocation = Optional.empty(); public CatalogType getCatalogType() { @@ -519,4 +521,30 @@ public IcebergConfig setMetadataCacheEnabled(boolean metadataCacheEnabled) this.metadataCacheEnabled = metadataCacheEnabled; return this; } + + public boolean isObjectStoreEnabled() + { + return objectStoreEnabled; + } + + @Config("iceberg.object-store.enabled") + @ConfigDescription("Enable the Iceberg object store file layout") + public IcebergConfig setObjectStoreEnabled(boolean objectStoreEnabled) + { + this.objectStoreEnabled = objectStoreEnabled; + return this; + } + + public Optional getDataLocation() + { + return dataLocation; + } + + @Config("iceberg.data-location") + @ConfigDescription("Path for data files") + public IcebergConfig setDataLocation(String dataLocation) + { + this.dataLocation = Optional.ofNullable(dataLocation); + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 7f576dddb5ff5c..3e2022bb576b66 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -109,6 +109,8 @@ public final class IcebergSessionProperties private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas"; private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled"; + private static final String OBJECT_STORE_ENABLED = "object_store_enabled"; + private static final String DATA_LOCATION = "data_location"; private final List> sessionProperties; @@ -392,6 +394,16 @@ public IcebergSessionProperties( "Enable Incremental refresh for MVs backed by Iceberg tables, when possible.", icebergConfig.isIncrementalRefreshEnabled(), false)) + .add(booleanProperty( + OBJECT_STORE_ENABLED, + "Enable Iceberg object store file layout", + icebergConfig.isObjectStoreEnabled(), + false)) + .add(stringProperty( + DATA_LOCATION, + "Location for data files", + icebergConfig.getDataLocation().orElse(null), + false)) .build(); } @@ -635,4 +647,14 @@ public static boolean isIncrementalRefreshEnabled(ConnectorSession session) { return session.getProperty(INCREMENTAL_REFRESH_ENABLED, Boolean.class); } + + public static boolean isObjectStoreEnabled(ConnectorSession session) + { + return session.getProperty(OBJECT_STORE_ENABLED, Boolean.class); + } + + public static Optional getDataLocation(ConnectorSession session) + { + return Optional.ofNullable(session.getProperty(DATA_LOCATION, String.class)); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java index ca967be9652def..b8cff1c640a176 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java @@ -35,6 +35,7 @@ import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX; import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; +import static io.trino.spi.session.PropertyMetadata.booleanProperty; import static io.trino.spi.session.PropertyMetadata.doubleProperty; import static io.trino.spi.session.PropertyMetadata.enumProperty; import static io.trino.spi.session.PropertyMetadata.integerProperty; @@ -58,6 +59,8 @@ public class IcebergTableProperties public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns"; public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp"; public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns"; + public static final String OBJECT_STORE_ENABLED_PROPERTY = "object_store_enabled"; + public static final String DATA_LOCATION_PROPERTY = "data_location"; public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties"; public static final Set SUPPORTED_PROPERTIES = ImmutableSet.builder() @@ -68,6 +71,8 @@ public class IcebergTableProperties .add(FORMAT_VERSION_PROPERTY) .add(ORC_BLOOM_FILTER_COLUMNS_PROPERTY) .add(ORC_BLOOM_FILTER_FPP_PROPERTY) + .add(OBJECT_STORE_ENABLED_PROPERTY) + .add(DATA_LOCATION_PROPERTY) .add(EXTRA_PROPERTIES_PROPERTY) .add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY) .build(); @@ -175,6 +180,16 @@ public IcebergTableProperties( .collect(toImmutableMap(entry -> entry.getKey().toLowerCase(ENGLISH), Map.Entry::getValue)); }, value -> value)) + .add(booleanProperty( + OBJECT_STORE_ENABLED_PROPERTY, + "Set to true to enable Iceberg object store file layout. Default false.", + icebergConfig.isObjectStoreEnabled(), + false)) + .add(stringProperty( + DATA_LOCATION_PROPERTY, + "File system location URI for the table's data files", + icebergConfig.getDataLocation().orElse(null), + false)) .build(); checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream() @@ -249,6 +264,16 @@ public static List getParquetBloomFilterColumns(Map tabl return parquetBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(parquetBloomFilterColumns); } + public static boolean getObjectStoreEnabled(Map tableProperties) + { + return (Boolean) tableProperties.get(OBJECT_STORE_ENABLED_PROPERTY); + } + + public static Optional getDataLocation(Map tableProperties) + { + return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY)); + } + public static Optional> getExtraProperties(Map tableProperties) { return Optional.ofNullable((Map) tableProperties.get(EXTRA_PROPERTIES_PROPERTY)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 4430f167e9bf9d..a9aae926d18bfc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -171,13 +171,11 @@ import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED; import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT; -import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS; import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; import static org.apache.iceberg.types.Type.TypeID.BINARY; import static org.apache.iceberg.types.Type.TypeID.FIXED; import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash; @@ -812,6 +810,10 @@ public static Map createTableProperties(ConnectorTableMetadata t IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties()); propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString()); propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties()))); + propertiesBuilder.put(OBJECT_STORE_ENABLED, Boolean.toString(IcebergTableProperties.getObjectStoreEnabled(tableMetadata.getProperties()))); + + Optional dataLocation = IcebergTableProperties.getDataLocation(tableMetadata.getProperties()); + dataLocation.ifPresent(s -> propertiesBuilder.put(WRITE_DATA_LOCATION, s)); // iceberg ORC format bloom filter properties used by create table List orcBloomFilterColumns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties()); @@ -931,17 +933,6 @@ public static long getSnapshotIdAsOfTime(Table table, long epochMillis) .snapshotId(); } - public static void validateTableCanBeDropped(Table table) - { - // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 - if (table.properties().containsKey(OBJECT_STORE_PATH) || - table.properties().containsKey("write.folder-storage.path") || // Removed from Iceberg as of 0.14.0, but preserved for backward compatibility - table.properties().containsKey(WRITE_METADATA_LOCATION) || - table.properties().containsKey(WRITE_DATA_LOCATION)) { - throw new TrinoException(NOT_SUPPORTED, "Table contains Iceberg path override properties and cannot be dropped from Trino: " + table.name()); - } - } - private static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName) { if (actualStorageFormat != expectedStorageFormat) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index a0a998ac75dd9e..9d39fa32eb602b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -149,7 +149,6 @@ import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; -import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER; import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput; @@ -674,7 +673,6 @@ private Optional> getCachedColumnMetadata(com.amazonaws.ser public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { BaseTable table = (BaseTable) loadTable(session, schemaTableName); - validateTableCanBeDropped(table); try { deleteTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 31fd010bfda682..45e749fa529eab 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -103,7 +103,6 @@ import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; -import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE; import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER; import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT; @@ -385,7 +384,6 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { BaseTable table = (BaseTable) loadTable(session, schemaTableName); TableMetadata metadata = table.operations().current(); - validateTableCanBeDropped(table); io.trino.metastore.Table metastoreTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()) .orElseThrow(() -> new TableNotFoundException(schemaTableName)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index e9658218234aa3..1d8daf1effd1e7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -81,7 +81,6 @@ import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; -import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; import static java.util.Locale.ENGLISH; @@ -319,7 +318,6 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName) public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { BaseTable table = (BaseTable) loadTable(session, schemaTableName); - validateTableCanBeDropped(table); jdbcCatalog.dropTable(toIdentifier(schemaTableName), false); try { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 4f42fe33ed58c7..240e41f4cdd16d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -64,7 +64,6 @@ import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; -import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.SchemaTableName.schemaTableName; @@ -232,8 +231,7 @@ public Map> tryGetColumnMetadata(Connector @Override public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) { - BaseTable table = (BaseTable) loadTable(session, schemaTableName); - validateTableCanBeDropped(table); + loadTable(session, schemaTableName); nessieClient.dropTable(toIdentifier(schemaTableName), true); // The table folder may be referenced by other branches. Therefore, dropping the table should not delete the data. // Nessie GC tool can be used to clean up the expired data. diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 140399c2c27526..c1fcdbac52aed8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -5051,7 +5051,8 @@ protected void verifyIcebergTableProperties(MaterializedResult actual) assertThat(actual).isNotNull(); MaterializedResult expected = resultBuilder(getSession()) .row("write.format.default", format.name()) - .row("write.parquet.compression-codec", "zstd").build(); + .row("write.parquet.compression-codec", "zstd") + .row("write.object-storage.enabled", "false").build(); assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows()); } @@ -7475,6 +7476,10 @@ public void testAlterTableWithUnsupportedProperties() "The following properties cannot be updated: location, orc_bloom_filter_fpp"); assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES format = 'ORC', orc_bloom_filter_columns = ARRAY['a']", "The following properties cannot be updated: orc_bloom_filter_columns"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES object_store_enabled = true", + "The following properties cannot be updated: object_store_enabled"); + assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES data_location = '/data'", + "The following properties cannot be updated: data_location"); assertQueryFails("ALTER TABLE " + tableName + " SET PROPERTIES extra_properties = MAP(ARRAY['extra.property.one'], ARRAY['foo'])", "The following properties cannot be updated: extra_properties"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 1ec49403515e47..4cf96f7b6ae1fe 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -74,7 +74,10 @@ public void testDefaults() .setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2) .setAllowedExtraProperties(ImmutableList.of()) .setIncrementalRefreshEnabled(true) - .setMetadataCacheEnabled(true)); + .setMetadataCacheEnabled(true) + .setIncrementalRefreshEnabled(true) + .setObjectStoreEnabled(false) + .setDataLocation(null)); } @Test @@ -111,6 +114,8 @@ public void testExplicitPropertyMappings() .put("iceberg.allowed-extra-properties", "propX,propY") .put("iceberg.incremental-refresh-enabled", "false") .put("iceberg.metadata-cache.enabled", "false") + .put("iceberg.object-store.enabled", "true") + .put("iceberg.data-location", "data_location") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -143,7 +148,10 @@ public void testExplicitPropertyMappings() .setSplitManagerThreads(42) .setAllowedExtraProperties(ImmutableList.of("propX", "propY")) .setIncrementalRefreshEnabled(false) - .setMetadataCacheEnabled(false); + .setMetadataCacheEnabled(false) + .setIncrementalRefreshEnabled(false) + .setObjectStoreEnabled(true) + .setDataLocation("data_location"); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithObjectStore.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithObjectStore.java new file mode 100644 index 00000000000000..4ab7f7a323434a --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithObjectStore.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.Table; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.MaterializedResult; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Optional; + +import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; +import static io.trino.plugin.iceberg.DataFileRecord.toDataFileRecord; +import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static org.assertj.core.api.Assertions.assertThat; + +final class TestIcebergTableWithObjectStore + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private TrinoFileSystem fileSystem; + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder() + .setIcebergProperties(Map.of( + "iceberg.object-store.enabled", "true", + "iceberg.data-location", "local:///table-location/xyz")) + .build(); + + metastore = ((IcebergConnector) queryRunner.getCoordinator().getConnector(ICEBERG_CATALOG)).getInjector() + .getInstance(HiveMetastoreFactory.class) + .createMetastore(Optional.empty()); + + fileSystem = getFileSystemFactory(queryRunner).create(SESSION); + + return queryRunner; + } + + @Test + void testCreateAndDrop() + throws Exception + { + assertQuerySucceeds("CREATE TABLE test_create_and_drop AS SELECT 1 AS val"); + Table table = metastore.getTable("tpch", "test_create_and_drop").orElseThrow(); + assertThat(table.getTableType()).isEqualTo(EXTERNAL_TABLE.name()); + + Location tableLocation = Location.of(table.getStorage().getLocation()); + assertThat(fileSystem.newInputFile(tableLocation).exists()) + .describedAs("The directory corresponding to the table storage location should exist") + .isTrue(); + + MaterializedResult materializedResult = computeActual("SELECT * FROM \"test_create_and_drop$files\""); + assertThat(materializedResult.getRowCount()).isEqualTo(1); + DataFileRecord dataFile = toDataFileRecord(materializedResult.getMaterializedRows().getFirst()); + Location dataFileLocation = Location.of(dataFile.getFilePath()); + assertThat(fileSystem.newInputFile(dataFileLocation).exists()) + .describedAs("The data file should exist") + .isTrue(); + assertThat(dataFile.getFilePath()) + .describedAs("The data file's path should start with the configured location") + .startsWith("local:///table-location/xyz"); + + assertQuerySucceeds("DROP TABLE test_create_and_drop"); + assertThat(metastore.getTable("tpch", "test_create_and_drop")) + .describedAs("Table should be dropped") + .isEmpty(); + assertThat(fileSystem.newInputFile(dataFileLocation).exists()) + .describedAs("The data file should have been removed") + .isFalse(); + assertThat(fileSystem.newInputFile(tableLocation).exists()) + .describedAs("The directory corresponding to the dropped Iceberg table should not be removed because it may be shared with other tables") + .isFalse(); + } + + @Test + void testCreateAndDropWithDifferentDataLocation() + throws Exception + { + assertQuerySucceeds("CREATE TABLE test_create_and_drop_with_different_location WITH (data_location = 'local:///table-location-2/abc') AS SELECT 1 AS val"); + Table table = metastore.getTable("tpch", "test_create_and_drop_with_different_location").orElseThrow(); + assertThat(table.getTableType()).isEqualTo(EXTERNAL_TABLE.name()); + + Location tableLocation = Location.of(table.getStorage().getLocation()); + assertThat(fileSystem.newInputFile(tableLocation).exists()) + .describedAs("The directory corresponding to the table storage location should exist") + .isTrue(); + + MaterializedResult materializedResult = computeActual("SELECT * FROM \"test_create_and_drop_with_different_location$files\""); + assertThat(materializedResult.getRowCount()).isEqualTo(1); + DataFileRecord dataFile = toDataFileRecord(materializedResult.getMaterializedRows().get(0)); + Location dataFileLocation = Location.of(dataFile.getFilePath()); + assertThat(fileSystem.newInputFile(dataFileLocation).exists()) + .describedAs("The data file should exist") + .isTrue(); + assertThat(dataFile.getFilePath()) + .describedAs("The data file's path should start with the configured location") + .startsWith("local:///table-location-2/abc"); + + assertQuerySucceeds("DROP TABLE test_create_and_drop_with_different_location"); + assertThat(metastore.getTable("tpch", "test_create_and_drop_with_different_location")) + .describedAs("Table should be dropped") + .isEmpty(); + assertThat(fileSystem.newInputFile(dataFileLocation).exists()) + .describedAs("The data file should have been removed") + .isFalse(); + assertThat(fileSystem.newInputFile(tableLocation).exists()) + .describedAs("The directory corresponding to the dropped Iceberg table should not be removed because it may be shared with other tables") + .isFalse(); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 4084b889768f6b..4f87a632ed07dc 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -1150,10 +1150,7 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1); assertThat(((String) queryResult.getOnlyValue())).contains(dataPath); - // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 - assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + trinoTableName)) - .hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino"); - onSpark().executeQuery("DROP TABLE " + sparkTableName); + onTrino().executeQuery("DROP TABLE " + trinoTableName); } @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion") @@ -1179,9 +1176,34 @@ public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageForma assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1); assertThat(((String) queryResult.getOnlyValue())).contains(dataPath); - assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + trinoTableName)) - .hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino"); - onSpark().executeQuery("DROP TABLE " + sparkTableName); + onTrino().executeQuery("DROP TABLE " + trinoTableName); + } + + @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion") + public void testSparkReadingTrinoObjectStorage(StorageFormat storageFormat, int specVersion) + { + String baseTableName = toLowerCase("test_trino_object_storage_location_provider_" + storageFormat); + String sparkTableName = sparkTableName(baseTableName); + String trinoTableName = trinoTableName(baseTableName); + String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_trino_object_storage_location_provider/obj-data"; + + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (" + + "object_store_enabled = true," + + "data_location = '%s'," + + "format = '%s'," + + "format_version = %s)", + trinoTableName, dataPath, storageFormat, specVersion)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", trinoTableName)); + + Row result = row("a_string", 1000000000000000L); + assertThat(onSpark().executeQuery(format("SELECT _string, _bigint FROM %s", sparkTableName))).containsOnly(result); + assertThat(onTrino().executeQuery(format("SELECT _string, _bigint FROM %s", trinoTableName))).containsOnly(result); + + QueryResult queryResult = onTrino().executeQuery(format("SELECT file_path FROM %s", trinoTableName("\"" + baseTableName + "$files\""))); + assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1); + assertThat(((String) queryResult.getOnlyValue()).contains(dataPath)).isTrue(); + + onTrino().executeQuery("DROP TABLE " + trinoTableName); } private static final List SPECIAL_CHARACTER_VALUES = ImmutableList.of(