diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 7f6357f6d848..e91976211ffe 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -812,6 +812,32 @@ Examples: CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]); +Remove Orphan Files +^^^^^^^^^^^^^^^^^^^ + +Use to remove files which are not referenced in any metadata files of an Iceberg table. + +The following arguments are available: + +===================== ========== =============== ======================================================================= +Argument Name required type Description +===================== ========== =============== ======================================================================= +``schema`` ✔️ string Schema of the table to clean + +``table_name`` ✔️ string Name of the table to clean + +``older_than`` timestamp Remove orphan files created before this timestamp (Default: 3 days ago) +===================== ========== =============== ======================================================================= + +Examples: + +* Remove any files which are not known to the table `db.sample` and older than specified timestamp:: + + CALL iceberg.system.remove_orphan_files('db', 'sample', TIMESTAMP '2023-08-31 00:00:00.000'); + +* Remove any files which are not known to the table `db.sample` and created 3 days ago (by default):: + + CALL iceberg.system.remove_orphan_files(schema => 'db', table_name => 'sample'); SQL Support ----------- diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 339ccc00813c..7ab6af729375 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -560,7 +560,7 @@ protected ImmutableMap createMetadataProperties(Table icebergTab return properties.build(); } - protected static Schema toIcebergSchema(List columns) + public static Schema toIcebergSchema(List columns) { List icebergColumns = new ArrayList<>(); for (ColumnMetadata column : columns) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index ba6db6eec5df..66d06f3a3106 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -41,6 +41,7 @@ import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider; import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure; import com.facebook.presto.iceberg.procedure.RegisterTableProcedure; +import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles; import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure; import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure; import com.facebook.presto.orc.CachingStripeMetadataSource; @@ -154,6 +155,7 @@ public void setup(Binder binder) procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(RemoveOrphanFiles.class).in(Scopes.SINGLETON); // for orc binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java index 36dda85989ea..1f1e5fdf5146 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java @@ -37,7 +37,8 @@ public enum IcebergErrorCode ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR), ICEBERG_INVALID_TABLE_TIMESTAMP(12, USER_ERROR), ICEBERG_ROLLBACK_ERROR(13, EXTERNAL), - ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR); + ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR), + ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL); private final ErrorCode errorCode; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index dc1c31266b87..b079c89f14f3 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -66,6 +66,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hive.HiveSchemaUtil; @@ -74,6 +75,7 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.SnapshotUtil; import java.io.IOException; @@ -1148,4 +1150,40 @@ public static Optional partitionDataFromStructLike(PartitionSpec } return partitionData; } + + /** + * Get the metadata location for target {@link Table}, + * considering iceberg table properties {@code WRITE_METADATA_LOCATION} + * */ + public static String metadataLocation(Table icebergTable) + { + String metadataLocation = icebergTable.properties().get(TableProperties.WRITE_METADATA_LOCATION); + + if (metadataLocation != null) { + return String.format("%s", LocationUtil.stripTrailingSlash(metadataLocation)); + } + else { + return String.format("%s/%s", icebergTable.location(), "metadata"); + } + } + + /** + * Get the data location for target {@link Table}, + * considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION} + * */ + public static String dataLocation(Table icebergTable) + { + Map properties = icebergTable.properties(); + String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + if (dataLocation == null) { + dataLocation = String.format("%s/data", icebergTable.location()); + } + } + } + return dataLocation; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RemoveOrphanFiles.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RemoveOrphanFiles.java new file mode 100644 index 000000000000..6bede7267f40 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/RemoveOrphanFiles.java @@ -0,0 +1,233 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.procedure; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.type.SqlTimestamp; +import com.facebook.presto.hive.HdfsContext; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.iceberg.IcebergAbstractMetadata; +import com.facebook.presto.iceberg.IcebergMetadataFactory; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.procedure.Procedure; +import com.facebook.presto.spi.procedure.Procedure.Argument; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle; +import static com.facebook.presto.common.type.StandardTypes.TIMESTAMP; +import static com.facebook.presto.common.type.StandardTypes.VARCHAR; +import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; +import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_UNKNOWN_MANIFEST_TYPE; +import static com.facebook.presto.iceberg.IcebergUtil.dataLocation; +import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; +import static com.facebook.presto.iceberg.IcebergUtil.metadataLocation; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.DAYS; +import static org.apache.hadoop.fs.Path.SEPARATOR; +import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations; +import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations; + +public class RemoveOrphanFiles + implements Provider +{ + private static final int REMOVE_UNUSED_FILES_OLDER_THAN_IN_DAYS = 3; + private static final int BATCH_DELETE_FILES_COUNT = 100; + private static final Logger LOGGER = Logger.get(RemoveOrphanFiles.class); + private static final MethodHandle DELETE_ORPHAN_FILES = methodHandle( + RemoveOrphanFiles.class, + "removeOrphanFiles", + ConnectorSession.class, + String.class, + String.class, + SqlTimestamp.class); + private final IcebergMetadataFactory metadataFactory; + private final HdfsEnvironment hdfsEnvironment; + + @Inject + public RemoveOrphanFiles(IcebergMetadataFactory metadataFactory, + HdfsEnvironment hdfsEnvironment) + { + this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + "system", + "remove_orphan_files", + ImmutableList.of( + new Argument("schema", VARCHAR), + new Argument("table_name", VARCHAR), + new Argument("older_than", TIMESTAMP, false, null)), + DELETE_ORPHAN_FILES.bindTo(this)); + } + + public void removeOrphanFiles(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + doRemoveOrphanFiles(clientSession, schema, tableName, olderThan); + } + } + + private void doRemoveOrphanFiles(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan) + { + IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) metadataFactory.create(); + SchemaTableName schemaTableName = new SchemaTableName(schema, tableName); + Table icebergTable = getIcebergTable(metadata, clientSession, schemaTableName); + + Set processedManifestFilePaths = new HashSet<>(); + ImmutableSet.Builder validMetadataFileNames = ImmutableSet.builder(); + ImmutableSet.Builder validDataFileNames = ImmutableSet.builder(); + + for (Snapshot snapshot : icebergTable.snapshots()) { + if (snapshot.manifestListLocation() != null) { + validMetadataFileNames.add(extractFileName(snapshot.manifestListLocation())); + } + + for (ManifestFile manifest : snapshot.allManifests(icebergTable.io())) { + if (!processedManifestFilePaths.add(manifest.path())) { + // Already read this manifest + continue; + } + + validMetadataFileNames.add(extractFileName(manifest.path())); + try (ManifestReader> manifestReader = readerForManifest(icebergTable, manifest)) { + for (ContentFile contentFile : manifestReader) { + validDataFileNames.add(extractFileName(contentFile.path().toString())); + } + } + catch (IOException e) { + throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Unable to list manifest file content from " + manifest.path(), e); + } + } + } + + metadataFileLocations(icebergTable, false).stream() + .map(RemoveOrphanFiles::extractFileName) + .forEach(validMetadataFileNames::add); + + statisticsFilesLocations(icebergTable).stream() + .map(RemoveOrphanFiles::extractFileName) + .forEach(validMetadataFileNames::add); + + // Always reserve `version-hint.text` as it's a shortcut to find the newest version + validMetadataFileNames.add("version-hint.text"); + + // Remove unused metadata and data files older than 3 days by default + // This default value is consistent with Spark procedure `remove_orphan_files` on Iceberg, see: + // https://iceberg.apache.org/docs/1.5.2/spark-procedures/#remove_orphan_files + long expiration = olderThan == null ? + System.currentTimeMillis() - DAYS.toMillis(REMOVE_UNUSED_FILES_OLDER_THAN_IN_DAYS) : + olderThan.isLegacyTimestamp() ? olderThan.getMillisUtc() : olderThan.getMillis(); + scanAndDeleteInvalidFiles(icebergTable, clientSession, schemaTableName, expiration, validDataFileNames.build(), dataLocation(icebergTable)); + scanAndDeleteInvalidFiles(icebergTable, clientSession, schemaTableName, expiration, validMetadataFileNames.build(), metadataLocation(icebergTable)); + } + + private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expiration, Set validFiles, String folderFullPath) + { + try { + List filesToDelete = new ArrayList<>(); + FileSystem fileSystem = getFileSystem(session, this.hdfsEnvironment, schemaTableName, new Path(table.location())); + Path fullPath = new Path(folderFullPath); + if (!fileSystem.exists(fullPath)) { + return; + } + RemoteIterator allFiles = fileSystem.listFiles(fullPath, true); + while (allFiles.hasNext()) { + LocatedFileStatus entry = allFiles.next(); + if (entry.getModificationTime() <= expiration && !validFiles.contains(entry.getPath().getName())) { + filesToDelete.add(entry.getPath().toString()); + if (filesToDelete.size() >= BATCH_DELETE_FILES_COUNT) { + LOGGER.debug("Deleting files while removing orphan files for table %s : %s", schemaTableName, filesToDelete); + CatalogUtil.deleteFiles(table.io(), filesToDelete, folderFullPath, true); + filesToDelete.clear(); + } + } + else { + LOGGER.debug("%s retained while removing orphan files %s", entry.getPath().toString(), schemaTableName.getTableName()); + } + } + + if (!filesToDelete.isEmpty()) { + LOGGER.debug("Deleting files while removing orphan files for table %s : %s", schemaTableName, filesToDelete); + CatalogUtil.deleteFiles(table.io(), filesToDelete, folderFullPath, true); + } + } + catch (IOException e) { + throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e); + } + } + + private static String extractFileName(String path) + { + return path.substring(path.lastIndexOf(SEPARATOR) + 1); + } + + private static FileSystem getFileSystem(ConnectorSession clientSession, HdfsEnvironment hdfsEnvironment, SchemaTableName schemaTableName, Path location) + { + HdfsContext hdfsContext = new HdfsContext( + clientSession, + schemaTableName.getSchemaName(), + schemaTableName.getTableName(), + location.getName(), + true); + + try { + return hdfsEnvironment.getFileSystem(hdfsContext, location); + } + catch (Exception e) { + throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, format("Error getting file system at path %s", location), e); + } + } + + private static ManifestReader> readerForManifest(Table table, ManifestFile manifest) + { + switch (manifest.content()) { + case DATA: + return ManifestFiles.read(manifest, table.io()); + case DELETES: + ManifestFiles.readDeleteManifest(manifest, table.io(), table.specs()); + default: + throw new PrestoException(ICEBERG_UNKNOWN_MANIFEST_TYPE, "Unknown manifest file content: " + manifest.content()); + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureBase.java new file mode 100644 index 000000000000..55589e8c00c0 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureBase.java @@ -0,0 +1,375 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.procedure; + +import com.facebook.presto.Session; +import com.facebook.presto.Session.SessionBuilder; +import com.facebook.presto.common.type.TimeZoneKey; +import com.facebook.presto.hive.HdfsConfiguration; +import com.facebook.presto.hive.HdfsConfigurationInitializer; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveHdfsConfiguration; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.authentication.NoHdfsAuthentication; +import com.facebook.presto.iceberg.CatalogType; +import com.facebook.presto.iceberg.IcebergConfig; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.iceberg.Table; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Map; + +import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP; +import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath; +import static com.facebook.presto.iceberg.IcebergUtil.dataLocation; +import static com.facebook.presto.iceberg.IcebergUtil.metadataLocation; +import static com.facebook.presto.iceberg.procedure.RegisterTableProcedure.getFileSystem; +import static com.google.common.io.Files.createTempDir; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +public abstract class TestRemoveOrphanFilesProcedureBase + extends AbstractTestQueryFramework +{ + public static final String ICEBERG_CATALOG = "iceberg"; + public static final String TEST_SCHEMA = "tpch"; + + private final CatalogType catalogType; + private final Map extraConnectorProperties; + + protected TestRemoveOrphanFilesProcedureBase(CatalogType catalogType, Map extraConnectorProperties) + { + this.catalogType = requireNonNull(catalogType, "catalogType is null"); + this.extraConnectorProperties = requireNonNull(extraConnectorProperties, "extraConnectorProperties is null"); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), catalogType, extraConnectorProperties); + } + + @DataProvider(name = "timezones") + public Object[][] timezones() + { + return new Object[][] { + {"UTC", true}, + {"America/Los_Angeles", true}, + {"Asia/Shanghai", true}, + {"UTC", false}}; + } + + @Test + public void testRemoveOrphanFilesInEmptyTable() + { + String tableName = "test_empty_table"; + Session session = getQueryRunner().getDefaultSession(); + try { + assertUpdate(format("create table %s (a int, b varchar)", tableName)); + + Table table = loadTable(tableName); + int metadataFilesCountBefore = allMetadataFilesCount(session, table); + int dataFilesCountBefore = allDataFilesCount(session, table); + assertUpdate(format("call iceberg.system.remove_orphan_files('%s', '%s')", TEST_SCHEMA, tableName)); + + int metadataFilesCountAfter = allMetadataFilesCount(session, table); + int dataFilesCountAfter = allDataFilesCount(session, table); + assertEquals(metadataFilesCountBefore, metadataFilesCountAfter); + assertEquals(dataFilesCountBefore, dataFilesCountAfter); + assertQuery("select count(*) from " + tableName, "values(0)"); + } + finally { + dropTable(tableName); + } + } + + @Test(dataProvider = "timezones") + public void testRemoveOrphanFilesInMetadataAndDataFolder(String zoneId, boolean legacyTimestamp) + { + String tableName = "test_table"; + Session session = sessionForTimezone(zoneId, legacyTimestamp); + try { + assertUpdate(session, format("create table %s (a int, b varchar)", tableName)); + assertUpdate(session, format("insert into %s values(1, '1001'), (2, '1002')", tableName), 2); + assertUpdate(session, format("insert into %s values(3, '1003'), (4, '1004')", tableName), 2); + assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003'), (4, '1004')"); + + Table table = loadTable(tableName); + int metadataFilesCountBefore = allMetadataFilesCount(session, table); + int dataFilesCountBefore = allDataFilesCount(session, table); + + // Generate 3 files in iceberg table data location + assertTrue(generateFile(session, TEST_SCHEMA, tableName, dataLocation(table), "file_1.test")); + assertTrue(generateFile(session, TEST_SCHEMA, tableName, dataLocation(table), "file_2.test")); + assertTrue(generateFile(session, TEST_SCHEMA, tableName, dataLocation(table), "file_3.test")); + + // Generate 2 files in iceberg table metadata location + assertTrue(generateFile(session, TEST_SCHEMA, tableName, metadataLocation(table), "file_4.test")); + assertTrue(generateFile(session, TEST_SCHEMA, tableName, metadataLocation(table), "file_5.test")); + + int metadataFilesCountMiddle = allMetadataFilesCount(session, table); + int dataFilesCountMiddle = allDataFilesCount(session, table); + + // Remove all orphan files older than now + waitUntilAfter(System.currentTimeMillis()); + String formattedDateTime = getTimestampString(System.currentTimeMillis(), zoneId); + assertUpdate(session, format("call iceberg.system.remove_orphan_files('%s', '%s', timestamp '%s')", TEST_SCHEMA, tableName, formattedDateTime)); + + int metadataFilesCountAfter = allMetadataFilesCount(session, table); + int dataFilesCountAfter = allDataFilesCount(session, table); + + assertEquals(metadataFilesCountBefore, metadataFilesCountAfter); + assertEquals(dataFilesCountBefore, dataFilesCountAfter); + + assertEquals(metadataFilesCountBefore + 2, metadataFilesCountMiddle); + assertEquals(dataFilesCountBefore + 3, dataFilesCountMiddle); + + assertEquals(metadataFilesCountMiddle, metadataFilesCountAfter + 2); + assertEquals(dataFilesCountMiddle, dataFilesCountAfter + 3); + assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003'), (4, '1004')"); + } + finally { + dropTable(tableName); + } + } + + @Test(dataProvider = "timezones") + public void testRemoveOrphanFilesWithNonDefaultMetadataPath(String zoneId, boolean legacyTimestamp) + { + Session session = sessionForTimezone(zoneId, legacyTimestamp); + String tempTableName = "temp_test_table"; + String tableName = "test_table"; + String tableTargetPath = createTempDir().toURI().toString(); + File metadataDir = new File(createTempDir().getAbsolutePath(), "metadata"); + metadataDir.mkdir(); + String specifiedMetadataPath = metadataDir.getAbsolutePath(); + + // Create an iceberg table using specified table properties + Table table = createTable(tempTableName, tableTargetPath, + ImmutableMap.of(WRITE_METADATA_LOCATION, specifiedMetadataPath)); + assertNotNull(table.properties().get(WRITE_METADATA_LOCATION)); + assertEquals(table.properties().get(WRITE_METADATA_LOCATION), specifiedMetadataPath); + + assertUpdate(session, format("CALL system.register_table('%s', '%s', '%s')", TEST_SCHEMA, tableName, metadataLocation(table))); + assertUpdate(session, "insert into " + tableName + " values(1, '1001'), (2, '1002')", 2); + assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002')"); + + int metadataFilesCountBefore = allMetadataFilesCount(session, table); + int dataFilesCountBefore = allDataFilesCount(session, table); + assertTrue(generateFile(session, TEST_SCHEMA, tableName, specifiedMetadataPath, "metadata_file_1.test")); + assertTrue(generateFile(session, TEST_SCHEMA, tableName, specifiedMetadataPath, "metadata_file_2.test")); + + int metadataFilesCountMiddle = allMetadataFilesCount(session, table); + int dataFilesCountMiddle = allDataFilesCount(session, table); + + // Remove all orphan files older than now + waitUntilAfter(System.currentTimeMillis()); + String formattedDateTime = getTimestampString(System.currentTimeMillis(), zoneId); + assertUpdate(session, format("call system.remove_orphan_files('%s', '%s', timestamp '%s')", TEST_SCHEMA, tableName, formattedDateTime)); + + int metadataFilesCountAfter = allMetadataFilesCount(session, table); + int dataFilesCountAfter = allDataFilesCount(session, table); + + assertEquals(metadataFilesCountBefore, metadataFilesCountAfter); + assertEquals(dataFilesCountBefore, dataFilesCountAfter); + + assertEquals(metadataFilesCountBefore + 2, metadataFilesCountMiddle); + assertEquals(dataFilesCountBefore, dataFilesCountMiddle); + + assertEquals(metadataFilesCountMiddle, metadataFilesCountAfter + 2); + assertEquals(dataFilesCountMiddle, dataFilesCountAfter); + assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002')"); + + assertUpdate(format("CALL system.unregister_table('%s', '%s')", TEST_SCHEMA, tableName)); + dropTableFromCatalog(tempTableName); + } + + @Test(dataProvider = "timezones") + public void testRemoveOrphanFilesWithNonDefaultDataPath(String zoneId, boolean legacyTimestamp) + { + Session session = sessionForTimezone(zoneId, legacyTimestamp); + String tempTableName = "temp_test_table"; + String tableName = "test_table"; + String tableTargetPath = createTempDir().toURI().toString(); + File dataDir = new File(createTempDir().getAbsolutePath(), "metadata"); + dataDir.mkdir(); + String specifiedDataPath = dataDir.getAbsolutePath(); + + // Create an iceberg table using specified table properties + Table table = createTable(tempTableName, tableTargetPath, + ImmutableMap.of(WRITE_DATA_LOCATION, specifiedDataPath)); + assertNotNull(table.properties().get(WRITE_DATA_LOCATION)); + assertEquals(table.properties().get(WRITE_DATA_LOCATION), specifiedDataPath); + + assertUpdate(session, format("CALL system.register_table('%s', '%s', '%s')", TEST_SCHEMA, tableName, metadataLocation(table))); + assertUpdate(session, "insert into " + tableName + " values(1, '1001'), (2, '1002')", 2); + assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002')"); + + int metadataFilesCountBefore = allMetadataFilesCount(session, table); + int dataFilesCountBefore = allDataFilesCount(session, table); + assertTrue(generateFile(session, TEST_SCHEMA, tableName, specifiedDataPath, "metadata_file_1.test")); + assertTrue(generateFile(session, TEST_SCHEMA, tableName, specifiedDataPath, "metadata_file_2.test")); + + int metadataFilesCountMiddle = allMetadataFilesCount(session, table); + int dataFilesCountMiddle = allDataFilesCount(session, table); + + // Remove all orphan files older than now + waitUntilAfter(System.currentTimeMillis()); + String formattedDateTime = getTimestampString(System.currentTimeMillis(), zoneId); + assertUpdate(session, format("call system.remove_orphan_files('%s', '%s', timestamp '%s')", TEST_SCHEMA, tableName, formattedDateTime)); + + int metadataFilesCountAfter = allMetadataFilesCount(session, table); + int dataFilesCountAfter = allDataFilesCount(session, table); + + assertEquals(metadataFilesCountBefore, metadataFilesCountAfter); + assertEquals(dataFilesCountBefore, dataFilesCountAfter); + + assertEquals(metadataFilesCountBefore, metadataFilesCountMiddle); + assertEquals(dataFilesCountBefore + 2, dataFilesCountMiddle); + + assertEquals(metadataFilesCountMiddle, metadataFilesCountAfter); + assertEquals(dataFilesCountMiddle, dataFilesCountAfter + 2); + assertQuery(session, "select * from " + tableName, "values(1, '1001'), (2, '1002')"); + + assertUpdate(format("CALL system.unregister_table('%s', '%s')", TEST_SCHEMA, tableName)); + dropTableFromCatalog(tempTableName); + } + + abstract Table createTable(String tableName, String targetPath, Map tableProperties); + + abstract Table loadTable(String tableName); + + abstract void dropTableFromCatalog(String tableName); + + private void dropTable(String tableName) + { + assertQuerySucceeds("DROP TABLE IF EXISTS " + TEST_SCHEMA + "." + tableName); + } + + private Session sessionForTimezone(String zoneId, boolean legacyTimestamp) + { + SessionBuilder sessionBuilder = Session.builder(getSession()) + .setSystemProperty(LEGACY_TIMESTAMP, String.valueOf(legacyTimestamp)); + if (legacyTimestamp) { + sessionBuilder.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(zoneId)); + } + return sessionBuilder.build(); + } + + protected File getCatalogDirectory(CatalogType type) + { + Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, type.name(), new IcebergConfig().getFileFormat(), false); + return catalogDirectory.toFile(); + } + + protected static HdfsEnvironment getHdfsEnvironment() + { + HiveClientConfig hiveClientConfig = new HiveClientConfig(); + MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig(); + HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration( + new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), + ImmutableSet.of(), + hiveClientConfig); + return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication()); + } + + private static long waitUntilAfter(long snapshotTimeMillis) + { + long currentTimeMillis = System.currentTimeMillis(); + assertTrue(snapshotTimeMillis - currentTimeMillis <= 10, + format("Snapshot time %s is greater than the current time %s by more than 10ms", snapshotTimeMillis, currentTimeMillis)); + + while (currentTimeMillis <= snapshotTimeMillis) { + currentTimeMillis = System.currentTimeMillis(); + } + return currentTimeMillis; + } + + private static int allMetadataFilesCount(Session session, Table table) + { + return allFilesCount(session.toConnectorSession(), TEST_SCHEMA, table.name(), metadataLocation(table)); + } + + private static int allDataFilesCount(Session session, Table table) + { + return allFilesCount(session.toConnectorSession(), TEST_SCHEMA, table.name(), dataLocation(table)); + } + + private static int allFilesCount(ConnectorSession session, String schema, String table, String folderFullPath) + { + try { + org.apache.hadoop.fs.Path fullPath = new org.apache.hadoop.fs.Path(folderFullPath); + FileSystem fileSystem = getFileSystem(session, getHdfsEnvironment(), new SchemaTableName(schema, table), fullPath); + RemoteIterator allFiles = fileSystem.listFiles(fullPath, true); + int count = 0; + while (allFiles.hasNext()) { + allFiles.next(); + count++; + } + return count; + } + catch (Exception e) { + return 0; + } + } + + private static boolean generateFile(Session session, String schema, String table, String path, String fileName) + { + try { + ConnectorSession connectorSession = session.toConnectorSession(); + FileSystem fileSystem = getFileSystem(connectorSession, getHdfsEnvironment(), new SchemaTableName(schema, table), new org.apache.hadoop.fs.Path(path)); + fileSystem.createNewFile(new org.apache.hadoop.fs.Path(path, fileName)); + } + catch (IOException e) { + return false; + } + return true; + } + + private static String getTimestampString(long timeMillsUtc, String zoneId) + { + Instant instant = Instant.ofEpochMilli(timeMillsUtc); + LocalDateTime localDateTime = instant + .atZone(ZoneId.of(zoneId)) + .toLocalDateTime(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + formatter = formatter.withZone(ZoneId.of(zoneId)); + return localDateTime.format(formatter); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHadoop.java new file mode 100644 index 000000000000..6a2835694ecc --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHadoop.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.procedure; + +import com.facebook.presto.spi.ColumnMetadata; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; + +import java.io.File; +import java.util.Map; + +import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.IcebergAbstractMetadata.toIcebergSchema; +import static com.google.common.io.Files.createTempDir; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; +import static org.assertj.core.api.Fail.fail; +import static org.testng.Assert.assertEquals; + +public class TestRemoveOrphanFilesProcedureHadoop + extends TestRemoveOrphanFilesProcedureBase +{ + public TestRemoveOrphanFilesProcedureHadoop() + { + super(HADOOP, ImmutableMap.of()); + } + + @Override + public void testRemoveOrphanFilesWithNonDefaultMetadataPath(String zoneId, boolean legacyTimestamp) + { + String tempTableName = "temp_test_table"; + String tableTargetPath = createTempDir().toURI().toString(); + String specifiedMetadataPath = createTempDir().getAbsolutePath(); + + // Hadoop doesn't support set table property `WRITE_METADATA_LOCATION` on table creation + try { + createTable(tempTableName, tableTargetPath, ImmutableMap.of(WRITE_METADATA_LOCATION, specifiedMetadataPath)); + fail("Should fail: Hadoop path-based tables cannot relocate metadata"); + } + catch (IllegalArgumentException e) { + assertEquals("Hadoop path-based tables cannot relocate metadata", e.getMessage()); + } + } + + @Override + Table createTable(String tableName, String targetPath, Map tableProperties) + { + Catalog catalog = CatalogUtil.loadCatalog(HADOOP.getCatalogImpl(), ICEBERG_CATALOG, getProperties(), new Configuration()); + return catalog.createTable(TableIdentifier.of(TEST_SCHEMA, tableName), + toIcebergSchema(ImmutableList.of(new ColumnMetadata("a", INTEGER), new ColumnMetadata("b", VARCHAR))), + null, + tableProperties); + } + + @Override + Table loadTable(String tableName) + { + Catalog catalog = CatalogUtil.loadCatalog(HADOOP.getCatalogImpl(), ICEBERG_CATALOG, getProperties(), new Configuration()); + return catalog.loadTable(TableIdentifier.of(TEST_SCHEMA, tableName)); + } + + @Override + void dropTableFromCatalog(String tableName) + { + Catalog catalog = CatalogUtil.loadCatalog(HADOOP.getCatalogImpl(), ICEBERG_CATALOG, getProperties(), new Configuration()); + catalog.dropTable(TableIdentifier.of(TEST_SCHEMA, tableName)); + } + + private Map getProperties() + { + File metastoreDir = getCatalogDirectory(HADOOP); + return ImmutableMap.of("warehouse", metastoreDir.toString()); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java new file mode 100644 index 000000000000..51b95f0947ce --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java @@ -0,0 +1,116 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.procedure; + +import com.facebook.presto.hive.HdfsContext; +import com.facebook.presto.hive.HiveColumnConverterProvider; +import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; +import com.facebook.presto.hive.metastore.MetastoreContext; +import com.facebook.presto.hive.metastore.file.FileHiveMetastore; +import com.facebook.presto.iceberg.HiveTableOperations; +import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig; +import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.metadata.CatalogManager; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; + +import java.util.Map; + +import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore; +import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders; +import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled; +import static com.facebook.presto.iceberg.CatalogType.HIVE; +import static com.facebook.presto.iceberg.IcebergAbstractMetadata.toIcebergSchema; +import static org.apache.iceberg.TableMetadata.newTableMetadata; +import static org.apache.iceberg.Transactions.createTableTransaction; + +public class TestRemoveOrphanFilesProcedureHive + extends TestRemoveOrphanFilesProcedureBase +{ + public TestRemoveOrphanFilesProcedureHive() + { + super(HIVE, ImmutableMap.of()); + } + + @Override + Table createTable(String tableName, String targetPath, Map tableProperties) + { + CatalogManager catalogManager = getDistributedQueryRunner().getCoordinator().getCatalogManager(); + ConnectorId connectorId = catalogManager.getCatalog(ICEBERG_CATALOG).get().getConnectorId(); + ConnectorSession session = getQueryRunner().getDefaultSession().toConnectorSession(connectorId); + + MetastoreContext context = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats()); + HdfsContext hdfsContext = new HdfsContext(session, "tpch", tableName); + TableOperations operations = new HiveTableOperations( + getFileHiveMetastore(), + context, + getHdfsEnvironment(), + hdfsContext, + new IcebergHiveTableOperationsConfig(), + "tpch", + tableName, + session.getUser(), + targetPath); + TableMetadata metadata = newTableMetadata( + toIcebergSchema(ImmutableList.of(new ColumnMetadata("a", INTEGER), new ColumnMetadata("b", VARCHAR))), + PartitionSpec.unpartitioned(), targetPath, + tableProperties); + Transaction transaction = createTableTransaction(tableName, operations, metadata); + transaction.commitTransaction(); + return transaction.table(); + } + + @Override + Table loadTable(String tableName) + { + CatalogManager catalogManager = getDistributedQueryRunner().getCoordinator().getCatalogManager(); + ConnectorId connectorId = catalogManager.getCatalog(ICEBERG_CATALOG).get().getConnectorId(); + + return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), + getHdfsEnvironment(), + new IcebergHiveTableOperationsConfig(), + getQueryRunner().getDefaultSession().toConnectorSession(connectorId), + SchemaTableName.valueOf("tpch." + tableName)); + } + + @Override + void dropTableFromCatalog(String tableName) + { + CatalogManager catalogManager = getDistributedQueryRunner().getCoordinator().getCatalogManager(); + ConnectorId connectorId = catalogManager.getCatalog(ICEBERG_CATALOG).get().getConnectorId(); + ConnectorSession session = getQueryRunner().getDefaultSession().toConnectorSession(connectorId); + + MetastoreContext context = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), isUserDefinedTypeEncodingEnabled(session), HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats()); + getFileHiveMetastore().dropTable(context, "tpch", tableName, true); + } + + private ExtendedHiveMetastore getFileHiveMetastore() + { + FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(), + getCatalogDirectory(HIVE).getPath(), + "test"); + return memoizeMetastore(fileHiveMetastore, false, 1000, 0); + } +}