Skip to content

Commit

Permalink
[Iceberg] Support procedure remove_orphan_files
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Jul 21, 2024
1 parent 06e7d50 commit e644017
Show file tree
Hide file tree
Showing 5 changed files with 534 additions and 1 deletion.
26 changes: 26 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,32 @@ Examples:

CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);

Remove Orphan Files
^^^^^^^^^^^^^^^^^^^

Used to remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".

The following arguments are available:

===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to clean

``table_name`` ✔️ string Name of the table to clean

``older_than`` timestamp Remove orphan files created before this timestamp (Default: 3 days ago)
===================== ========== =============== =======================================================================

Examples:

* Remove any files which are not known to the table `db.sample` and older than specified timestamp::

CALL iceberg.system.remove_orphan_files('db', 'sample', TIMESTAMP '2023-08-31 00:00:00.000');

* Remove any files which are not known to the table `db.sample` and created 3 days ago (by default)::

CALL iceberg.system.remove_orphan_files(schema => 'db', table_name => 'sample');

SQL Support
-----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure;
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
import com.facebook.presto.orc.CachingStripeMetadataSource;
Expand Down Expand Up @@ -154,6 +155,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RemoveOrphanFiles.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum IcebergErrorCode
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR),
ICEBERG_INVALID_TABLE_TIMESTAMP(12, USER_ERROR),
ICEBERG_ROLLBACK_ERROR(13, EXTERNAL),
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR);
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR),
ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.procedure;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.type.SqlTimestamp;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.iceberg.IcebergAbstractMetadata;
import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import javax.inject.Inject;
import javax.inject.Provider;

import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.TIMESTAMP;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_UNKNOWN_MANIFEST_TYPE;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.DAYS;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations;

public class RemoveOrphanFiles
implements Provider<Procedure>
{
private static final int REMOVE_UNUSED_FILES_OLDER_THAN_IN_DAYS = 3;
private static final int BATCH_DELETE_FILES_COUNT = 100;
private static final Logger LOGGER = Logger.get(RemoveOrphanFiles.class);
private static final MethodHandle DELETE_ORPHAN_FILES = methodHandle(
RemoveOrphanFiles.class,
"removeOrphanFiles",
ConnectorSession.class,
String.class,
String.class,
SqlTimestamp.class);
private final IcebergMetadataFactory metadataFactory;
private final HdfsEnvironment hdfsEnvironment;

@Inject
public RemoveOrphanFiles(IcebergMetadataFactory metadataFactory,
HdfsEnvironment hdfsEnvironment)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
"remove_orphan_files",
ImmutableList.of(
new Procedure.Argument("schema", VARCHAR),
new Procedure.Argument("table_name", VARCHAR),
new Procedure.Argument("older_than", TIMESTAMP, false, null)),
DELETE_ORPHAN_FILES.bindTo(this));
}

public void removeOrphanFiles(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doRemoveOrphanFiles(clientSession, schema, tableName, olderThan);
}
}

private void doRemoveOrphanFiles(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan)
{
IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) metadataFactory.create();
SchemaTableName schemaTableName = new SchemaTableName(schema, tableName);
Table icebergTable = IcebergUtil.getIcebergTable(metadata, clientSession, schemaTableName);

Set<String> processedManifestFilePaths = new HashSet<>();
ImmutableSet.Builder<String> validMetadataFileNames = ImmutableSet.builder();
ImmutableSet.Builder<String> validDataFileNames = ImmutableSet.builder();

for (Snapshot snapshot : icebergTable.snapshots()) {
if (snapshot.manifestListLocation() != null) {
validMetadataFileNames.add(extractFileName(snapshot.manifestListLocation()));
}

for (ManifestFile manifest : snapshot.allManifests(icebergTable.io())) {
if (!processedManifestFilePaths.add(manifest.path())) {
// Already read this manifest
continue;
}

validMetadataFileNames.add(extractFileName(manifest.path()));
try (ManifestReader<? extends ContentFile<?>> manifestReader = readerForManifest(icebergTable, manifest)) {
for (ContentFile<?> contentFile : manifestReader) {
validDataFileNames.add(extractFileName(contentFile.path().toString()));
}
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Unable to list manifest file content from " + manifest.path(), e);
}
}
}

metadataFileLocations(icebergTable, false).stream()
.map(this::extractFileName)
.forEach(validMetadataFileNames::add);

statisticsFilesLocations(icebergTable).stream()
.map(this::extractFileName)
.forEach(validMetadataFileNames::add);

validMetadataFileNames.add("version-hint.text");

// Remove unused metadata and data files older than 3 days by default
long expiration = olderThan == null ?
System.currentTimeMillis() - DAYS.toMillis(REMOVE_UNUSED_FILES_OLDER_THAN_IN_DAYS) :
olderThan.isLegacyTimestamp() ? olderThan.getMillisUtc() : olderThan.getMillis();
scanAndDeleteInvalidFiles(icebergTable, clientSession, schemaTableName, expiration, validDataFileNames.build(), "data");
scanAndDeleteInvalidFiles(icebergTable, clientSession, schemaTableName, expiration, validMetadataFileNames.build(), "metadata");
}

private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expiration, Set<String> validFiles, String subFolder)
{
try {
List<String> filesToDelete = new ArrayList<>();
FileSystem fileSystem = getFileSystem(session, this.hdfsEnvironment, schemaTableName, new Path(table.location()));
Path subFolderPath = new Path(table.location(), subFolder);
if (!fileSystem.exists(subFolderPath)) {
return;
}
RemoteIterator<LocatedFileStatus> allFiles = fileSystem.listFiles(subFolderPath, true);
while (allFiles.hasNext()) {
LocatedFileStatus entry = allFiles.next();
if (entry.getModificationTime() <= expiration && !validFiles.contains(entry.getPath().getName())) {
filesToDelete.add(entry.getPath().toString());
if (filesToDelete.size() >= BATCH_DELETE_FILES_COUNT) {
LOGGER.debug("Deleting %s files while removing orphan files for table %s", filesToDelete, schemaTableName);
CatalogUtil.deleteFiles(table.io(), filesToDelete, subFolder, true);
filesToDelete.clear();
}
}
else {
LOGGER.debug("%s retained while removing orphan files %s", entry.getPath().toString(), schemaTableName.getTableName());
}
}

if (!filesToDelete.isEmpty()) {
LOGGER.debug("Deleting %s files while removing orphan files for table %s", filesToDelete, schemaTableName);
CatalogUtil.deleteFiles(table.io(), filesToDelete, subFolder, true);
}
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e);
}
}

private String extractFileName(String path)
{
return path.substring(path.lastIndexOf('/') + 1);
}

private static FileSystem getFileSystem(ConnectorSession clientSession, HdfsEnvironment hdfsEnvironment, SchemaTableName schemaTableName, Path location)
{
HdfsContext hdfsContext = new HdfsContext(
clientSession,
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
location.getName(),
true);

try {
return hdfsEnvironment.getFileSystem(hdfsContext, location);
}
catch (Exception e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, format("Error getting file system at path %s", location), e);
}
}

private static ManifestReader<? extends ContentFile<?>> readerForManifest(Table table, ManifestFile manifest)
{
switch (manifest.content()) {
case DATA:
return ManifestFiles.read(manifest, table.io());
case DELETES:
ManifestFiles.readDeleteManifest(manifest, table.io(), table.specs());
default:
throw new PrestoException(ICEBERG_UNKNOWN_MANIFEST_TYPE, "Unknown manifest file content: " + manifest.content());
}
}
}
Loading

0 comments on commit e644017

Please sign in to comment.