Skip to content

Commit

Permalink
[Iceberg] Support procedure remove_orphan_files
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Jul 30, 2024
1 parent 5e9baaf commit 755e834
Show file tree
Hide file tree
Showing 6 changed files with 570 additions and 1 deletion.
26 changes: 26 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,32 @@ Examples:

CALL iceberg.system.expire_snapshots(schema => 'schema_name', table_name => 'table_name', snapshot_ids => ARRAY[10001, 10002]);

Remove Orphan Files
^^^^^^^^^^^^^^^^^^^

Use to remove files which are not referenced in any metadata files of an Iceberg table.

The following arguments are available:

===================== ========== =============== =======================================================================
Argument Name required type Description
===================== ========== =============== =======================================================================
``schema`` ✔️ string Schema of the table to clean

``table_name`` ✔️ string Name of the table to clean

``older_than`` timestamp Remove orphan files created before this timestamp (Default: 3 days ago)
===================== ========== =============== =======================================================================

Examples:

* Remove any files which are not known to the table `db.sample` and older than specified timestamp::

CALL iceberg.system.remove_orphan_files('db', 'sample', TIMESTAMP '2023-08-31 00:00:00.000');

* Remove any files which are not known to the table `db.sample` and created 3 days ago (by default)::

CALL iceberg.system.remove_orphan_files(schema => 'db', table_name => 'sample');

SQL Support
-----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure;
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
import com.facebook.presto.orc.CachingStripeMetadataSource;
Expand Down Expand Up @@ -154,6 +155,7 @@ public void setup(Binder binder)
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RemoveOrphanFiles.class).in(Scopes.SINGLETON);

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum IcebergErrorCode
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR),
ICEBERG_INVALID_TABLE_TIMESTAMP(12, USER_ERROR),
ICEBERG_ROLLBACK_ERROR(13, EXTERNAL),
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR);
ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR),
ICEBERG_UNKNOWN_MANIFEST_TYPE(15, EXTERNAL);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hive.HiveSchemaUtil;
Expand All @@ -74,6 +75,7 @@
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.SnapshotUtil;

import java.io.IOException;
Expand Down Expand Up @@ -1148,4 +1150,40 @@ public static Optional<PartitionData> partitionDataFromStructLike(PartitionSpec
}
return partitionData;
}

/**
* Get the metadata location for target {@link Table},
* considering iceberg table properties {@code WRITE_METADATA_LOCATION}
* */
public static String metadataLocation(Table icebergTable)
{
String metadataLocation = icebergTable.properties().get(TableProperties.WRITE_METADATA_LOCATION);

if (metadataLocation != null) {
return String.format("%s", LocationUtil.stripTrailingSlash(metadataLocation));
}
else {
return String.format("%s/%s", icebergTable.location(), "metadata");
}
}

/**
* Get the data location for target {@link Table},
* considering iceberg table properties {@code WRITE_DATA_LOCATION}, {@code OBJECT_STORE_PATH} and {@code WRITE_FOLDER_STORAGE_LOCATION}
* */
public static String dataLocation(Table icebergTable)
{
Map<String, String> properties = icebergTable.properties();
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
if (dataLocation == null) {
dataLocation = String.format("%s/data", icebergTable.location());
}
}
}
return dataLocation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.procedure;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.type.SqlTimestamp;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.iceberg.IcebergAbstractMetadata;
import com.facebook.presto.iceberg.IcebergMetadataFactory;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.procedure.Procedure.Argument;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import javax.inject.Inject;
import javax.inject.Provider;

import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.TIMESTAMP;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_UNKNOWN_MANIFEST_TYPE;
import static com.facebook.presto.iceberg.IcebergUtil.dataLocation;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.metadataLocation;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.DAYS;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations;

public class RemoveOrphanFiles
implements Provider<Procedure>
{
private static final int REMOVE_UNUSED_FILES_OLDER_THAN_IN_DAYS = 3;
private static final int BATCH_DELETE_FILES_COUNT = 100;
private static final Logger LOGGER = Logger.get(RemoveOrphanFiles.class);
private static final MethodHandle DELETE_ORPHAN_FILES = methodHandle(
RemoveOrphanFiles.class,
"removeOrphanFiles",
ConnectorSession.class,
String.class,
String.class,
SqlTimestamp.class);
private final IcebergMetadataFactory metadataFactory;
private final HdfsEnvironment hdfsEnvironment;

@Inject
public RemoveOrphanFiles(IcebergMetadataFactory metadataFactory,
HdfsEnvironment hdfsEnvironment)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
"remove_orphan_files",
ImmutableList.of(
new Argument("schema", VARCHAR),
new Argument("table_name", VARCHAR),
new Argument("older_than", TIMESTAMP, false, null)),
DELETE_ORPHAN_FILES.bindTo(this));
}

public void removeOrphanFiles(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doRemoveOrphanFiles(clientSession, schema, tableName, olderThan);
}
}

private void doRemoveOrphanFiles(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan)
{
IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) metadataFactory.create();
SchemaTableName schemaTableName = new SchemaTableName(schema, tableName);
Table icebergTable = getIcebergTable(metadata, clientSession, schemaTableName);

Set<String> processedManifestFilePaths = new HashSet<>();
ImmutableSet.Builder<String> validMetadataFileNames = ImmutableSet.builder();
ImmutableSet.Builder<String> validDataFileNames = ImmutableSet.builder();

for (Snapshot snapshot : icebergTable.snapshots()) {
if (snapshot.manifestListLocation() != null) {
validMetadataFileNames.add(extractFileName(snapshot.manifestListLocation()));
}

for (ManifestFile manifest : snapshot.allManifests(icebergTable.io())) {
if (!processedManifestFilePaths.add(manifest.path())) {
// Already read this manifest
continue;
}

validMetadataFileNames.add(extractFileName(manifest.path()));
try (ManifestReader<? extends ContentFile<?>> manifestReader = readerForManifest(icebergTable, manifest)) {
for (ContentFile<?> contentFile : manifestReader) {
validDataFileNames.add(extractFileName(contentFile.path().toString()));
}
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Unable to list manifest file content from " + manifest.path(), e);
}
}
}

metadataFileLocations(icebergTable, false).stream()
.map(this::extractFileName)
.forEach(validMetadataFileNames::add);

statisticsFilesLocations(icebergTable).stream()
.map(this::extractFileName)
.forEach(validMetadataFileNames::add);

// Always reserve `version-hint.text` as it's a shortcut to find the newest version
validMetadataFileNames.add("version-hint.text");

// Remove unused metadata and data files older than 3 days by default
// This default value is consistent with Spark procedure `remove_orphan_files` on Iceberg, see:
// https://iceberg.apache.org/docs/1.5.2/spark-procedures/#remove_orphan_files
long expiration = olderThan == null ?
System.currentTimeMillis() - DAYS.toMillis(REMOVE_UNUSED_FILES_OLDER_THAN_IN_DAYS) :
olderThan.isLegacyTimestamp() ? olderThan.getMillisUtc() : olderThan.getMillis();
scanAndDeleteInvalidFiles(icebergTable, clientSession, schemaTableName, expiration, validDataFileNames.build(), dataLocation(icebergTable));
scanAndDeleteInvalidFiles(icebergTable, clientSession, schemaTableName, expiration, validMetadataFileNames.build(), metadataLocation(icebergTable));
}

private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expiration, Set<String> validFiles, String folderFullPath)
{
try {
List<String> filesToDelete = new ArrayList<>();
FileSystem fileSystem = getFileSystem(session, this.hdfsEnvironment, schemaTableName, new Path(table.location()));
Path fullPath = new Path(folderFullPath);
if (!fileSystem.exists(fullPath)) {
return;
}
RemoteIterator<LocatedFileStatus> allFiles = fileSystem.listFiles(fullPath, true);
while (allFiles.hasNext()) {
LocatedFileStatus entry = allFiles.next();
if (entry.getModificationTime() <= expiration && !validFiles.contains(entry.getPath().getName())) {
filesToDelete.add(entry.getPath().toString());
if (filesToDelete.size() >= BATCH_DELETE_FILES_COUNT) {
LOGGER.debug("Deleting files while removing orphan files for table %s : %s", schemaTableName, filesToDelete);
CatalogUtil.deleteFiles(table.io(), filesToDelete, folderFullPath, true);
filesToDelete.clear();
}
}
else {
LOGGER.debug("%s retained while removing orphan files %s", entry.getPath().toString(), schemaTableName.getTableName());
}
}

if (!filesToDelete.isEmpty()) {
LOGGER.debug("Deleting files while removing orphan files for table %s : %s", schemaTableName, filesToDelete);
CatalogUtil.deleteFiles(table.io(), filesToDelete, folderFullPath, true);
}
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e);
}
}

private String extractFileName(String path)
{
return path.substring(path.lastIndexOf('/') + 1);
}

private static FileSystem getFileSystem(ConnectorSession clientSession, HdfsEnvironment hdfsEnvironment, SchemaTableName schemaTableName, Path location)
{
HdfsContext hdfsContext = new HdfsContext(
clientSession,
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
location.getName(),
true);

try {
return hdfsEnvironment.getFileSystem(hdfsContext, location);
}
catch (Exception e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, format("Error getting file system at path %s", location), e);
}
}

private static ManifestReader<? extends ContentFile<?>> readerForManifest(Table table, ManifestFile manifest)
{
switch (manifest.content()) {
case DATA:
return ManifestFiles.read(manifest, table.io());
case DELETES:
ManifestFiles.readDeleteManifest(manifest, table.io(), table.specs());
default:
throw new PrestoException(ICEBERG_UNKNOWN_MANIFEST_TYPE, "Unknown manifest file content: " + manifest.content());
}
}
}
Loading

0 comments on commit 755e834

Please sign in to comment.