Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ability to recursively delete table data in Iceberg #6108

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.trino.plugin.hive.HiveCompressionCodec;
import org.apache.iceberg.FileFormat;

Expand All @@ -27,6 +28,7 @@ public class IcebergConfig
private IcebergFileFormat fileFormat = ORC;
private HiveCompressionCodec compressionCodec = GZIP;
private boolean useFileSizeFromMetadata = true;
private boolean purgeDataOnTableDrop;

@NotNull
public FileFormat getFileFormat()
Expand Down Expand Up @@ -74,4 +76,17 @@ public IcebergConfig setUseFileSizeFromMetadata(boolean useFileSizeFromMetadata)
this.useFileSizeFromMetadata = useFileSizeFromMetadata;
return this;
}

public boolean isPurgeDataOnTableDrop()
{
return purgeDataOnTableDrop;
}

@Config("iceberg.delete-files-on-table-drop")
@ConfigDescription("Recursively delete table data and metadata on drop")
public IcebergConfig setPurgeDataOnTableDrop(boolean purgeDataOnTableDrop)
{
this.purgeDataOnTableDrop = purgeDataOnTableDrop;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public class IcebergMetadata
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final boolean purgeDataOnDrop;

private final Map<String, Optional<Long>> snapshotIds = new ConcurrentHashMap<>();

Expand All @@ -174,12 +175,14 @@ public IcebergMetadata(
HiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec)
JsonCodec<CommitTaskData> commitTaskCodec,
boolean purgeDataOnDrop)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.purgeDataOnDrop = purgeDataOnDrop;
}

@Override
Expand Down Expand Up @@ -606,7 +609,14 @@ public Optional<Object> getInfo(ConnectorTableHandle tableHandle)
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), true);
Optional<Table> table = metastore.getTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName());

metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), false);
if (purgeDataOnDrop && table.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if drop is in the middle of a transaction and if it is aborted ? Deleting these files has to be done only when we commit those changes.

String tableLocation = table.get().getStorage().getLocation();
HdfsContext context = new HdfsContext(session, handle.getSchemaName(), handle.getTableName());
deleteDirRecursive(context, hdfsEnvironment, new Path(tableLocation));
}
}

@Override
Expand Down Expand Up @@ -1074,6 +1084,16 @@ private Map<String, Optional<TableToken>> getMaterializedViewToken(ConnectorSess
return viewToken;
}

private void deleteDirRecursive(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path)
{
try {
hdfsEnvironment.getFileSystem(context, path).delete(path, true);
}
catch (IOException | RuntimeException e) {
log.error(e, "Failed to delete table data files in path: " + path.toString());
}
}

private static class TableToken
{
// Current Snapshot ID of the table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class IcebergMetadataFactory
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final boolean purgeTableDataOnDrop;

@Inject
public IcebergMetadataFactory(
Expand All @@ -37,23 +38,25 @@ public IcebergMetadataFactory(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskDataJsonCodec)
{
this(metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec);
this(metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, config.isPurgeDataOnTableDrop());
}

public IcebergMetadataFactory(
HiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec)
JsonCodec<CommitTaskData> commitTaskCodec,
boolean purgeTableData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
boolean purgeTableData)
boolean purgeTableDataOnDrop)

{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.purgeTableDataOnDrop = purgeTableData;
}

public IcebergMetadata create()
{
return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec);
return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, purgeTableDataOnDrop);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(IcebergConfig.class)
.setFileFormat(ORC)
.setCompressionCodec(GZIP)
.setUseFileSizeFromMetadata(true));
.setUseFileSizeFromMetadata(true)
.setPurgeDataOnTableDrop(false));
}

@Test
Expand All @@ -44,12 +45,14 @@ public void testExplicitPropertyMappings()
.put("iceberg.file-format", "Parquet")
.put("iceberg.compression-codec", "NONE")
.put("iceberg.use-file-size-from-metadata", "false")
.put("iceberg.delete-files-on-table-drop", "true")
.build();

IcebergConfig expected = new IcebergConfig()
.setFileFormat(PARQUET)
.setCompressionCodec(HiveCompressionCodec.NONE)
.setUseFileSizeFromMetadata(false);
.setUseFileSizeFromMetadata(false)
.setPurgeDataOnTableDrop(true);

assertFullMapping(properties, expected);
}
Expand Down