From 2bd7109fb0e766623ae85718544544ee58675302 Mon Sep 17 00:00:00 2001 From: serhiish Date: Thu, 26 Nov 2020 16:50:17 +0200 Subject: [PATCH 1/3] Iceberg removes table metadata only from metastore, but not removes table data and metadata from storage. This PR adding ability to also recursively removes table data and metadata from storage. Added new configuration property which is responsible for enabling/disabling recursive delete of table data on drop. --- .../plugin/iceberg/IcebergConfig.java | 15 ++++++++++++ .../plugin/iceberg/IcebergMetadata.java | 24 +++++++++++++++++-- .../iceberg/IcebergMetadataFactory.java | 9 ++++--- .../plugin/iceberg/TestIcebergConfig.java | 7 ++++-- 4 files changed, 48 insertions(+), 7 deletions(-) diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergConfig.java index 4354cabce0ca0..17a9c71e3fbfc 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergConfig.java @@ -14,6 +14,7 @@ package io.prestosql.plugin.iceberg; import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; import io.prestosql.plugin.hive.HiveCompressionCodec; import org.apache.iceberg.FileFormat; @@ -26,6 +27,7 @@ public class IcebergConfig { private IcebergFileFormat fileFormat = ORC; private HiveCompressionCodec compressionCodec = GZIP; + private boolean purgeDataOnTableDrop; @NotNull public FileFormat getFileFormat() @@ -52,4 +54,17 @@ public IcebergConfig setCompressionCodec(HiveCompressionCodec compressionCodec) this.compressionCodec = compressionCodec; return this; } + + public boolean isPurgeDataOnTableDrop() + { + return purgeDataOnTableDrop; + } + + @Config("iceberg.delete-files-on-table-drop") + @ConfigDescription("Recursively delete table data and metadata on drop") + public IcebergConfig setPurgeDataOnTableDrop(boolean purgeDataOnTableDrop) + { + this.purgeDataOnTableDrop = purgeDataOnTableDrop; + return this; + } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java index ede4f2c47f03e..c406f9f14ddf9 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java @@ -133,6 +133,7 @@ public class IcebergMetadata private final JsonCodec commitTaskCodec; private final Map> snapshotIds = new ConcurrentHashMap<>(); + private final boolean purgeDataOnDrop; private Transaction transaction; @@ -140,12 +141,14 @@ public IcebergMetadata( HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, - JsonCodec commitTaskCodec) + JsonCodec commitTaskCodec, + boolean purgeDataOnDrop) { this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); + this.purgeDataOnDrop = purgeDataOnDrop; } @Override @@ -504,7 +507,24 @@ public Optional getInfo(ConnectorTableHandle tableHandle) public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), true); + Optional table = metastore.getTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName()); + + metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), false); + if (purgeDataOnDrop && table.isPresent()) { + String tableLocation = table.get().getStorage().getLocation(); + HdfsContext context = new HdfsContext(session, handle.getSchemaName(), handle.getTableName()); + deleteDirRecursive(context, hdfsEnvironment, new Path(tableLocation)); + } + } + + private void deleteDirRecursive(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path) + { + try { + hdfsEnvironment.getFileSystem(context, path).delete(path, true); + } + catch (IOException | RuntimeException e) { + log.error(e, "Failed to delete table data files in path: " + path.toString()); + } } @Override diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadataFactory.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadataFactory.java index c2752e01799fc..e79a528d1baf1 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadataFactory.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadataFactory.java @@ -28,6 +28,7 @@ public class IcebergMetadataFactory private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; private final JsonCodec commitTaskCodec; + private final boolean purgeTableDataOnDrop; @Inject public IcebergMetadataFactory( @@ -37,23 +38,25 @@ public IcebergMetadataFactory( TypeManager typeManager, JsonCodec commitTaskDataJsonCodec) { - this(metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec); + this(metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, config.isPurgeDataOnTableDrop()); } public IcebergMetadataFactory( HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, - JsonCodec commitTaskCodec) + JsonCodec commitTaskCodec, + boolean purgeTableData) { this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); + this.purgeTableDataOnDrop = purgeTableData; } public IcebergMetadata create() { - return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec); + return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, purgeTableDataOnDrop); } } diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergConfig.java index 8704d66afee5f..7f448e25f2c06 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergConfig.java @@ -33,7 +33,8 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(IcebergConfig.class) .setFileFormat(ORC) - .setCompressionCodec(GZIP)); + .setCompressionCodec(GZIP) + .setPurgeDataOnTableDrop(false)); } @Test @@ -42,11 +43,13 @@ public void testExplicitPropertyMappings() Map properties = new ImmutableMap.Builder() .put("iceberg.file-format", "Parquet") .put("iceberg.compression-codec", "NONE") + .put("iceberg.delete-files-on-table-drop", "true") .build(); IcebergConfig expected = new IcebergConfig() .setFileFormat(PARQUET) - .setCompressionCodec(HiveCompressionCodec.NONE); + .setCompressionCodec(HiveCompressionCodec.NONE) + .setPurgeDataOnTableDrop(true); assertFullMapping(properties, expected); } From fc791e29278ef5a19442f3e6632151118ca2c2cb Mon Sep 17 00:00:00 2001 From: serhiish Date: Tue, 26 Jan 2021 15:50:09 +0200 Subject: [PATCH 2/3] Conflicts resolved after renaming to Trino --- .../trino/plugin/iceberg/IcebergConfig.java | 15 +++++++++++++ .../trino/plugin/iceberg/IcebergMetadata.java | 22 +++++++++++++++++-- .../iceberg/IcebergMetadataFactory.java | 9 +++++--- .../plugin/iceberg/TestIcebergConfig.java | 7 ++++-- 4 files changed, 46 insertions(+), 7 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 676d7c1e6d331..550ca20153d00 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; import io.trino.plugin.hive.HiveCompressionCodec; import org.apache.iceberg.FileFormat; @@ -27,6 +28,7 @@ public class IcebergConfig private IcebergFileFormat fileFormat = ORC; private HiveCompressionCodec compressionCodec = GZIP; private boolean useFileSizeFromMetadata = true; + private boolean purgeDataOnTableDrop; @NotNull public FileFormat getFileFormat() @@ -74,4 +76,17 @@ public IcebergConfig setUseFileSizeFromMetadata(boolean useFileSizeFromMetadata) this.useFileSizeFromMetadata = useFileSizeFromMetadata; return this; } + + public boolean isPurgeDataOnTableDrop() + { + return purgeDataOnTableDrop; + } + + @Config("iceberg.delete-files-on-table-drop") + @ConfigDescription("Recursively delete table data and metadata on drop") + public IcebergConfig setPurgeDataOnTableDrop(boolean purgeDataOnTableDrop) + { + this.purgeDataOnTableDrop = purgeDataOnTableDrop; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 340a1f5923cc1..69461ce02702d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -165,6 +165,7 @@ public class IcebergMetadata private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; private final JsonCodec commitTaskCodec; + private final boolean purgeDataOnDrop; private final Map> snapshotIds = new ConcurrentHashMap<>(); @@ -174,12 +175,14 @@ public IcebergMetadata( HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, - JsonCodec commitTaskCodec) + JsonCodec commitTaskCodec, + boolean purgeDataOnDrop) { this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); + this.purgeDataOnDrop = purgeDataOnDrop; } @Override @@ -606,7 +609,14 @@ public Optional getInfo(ConnectorTableHandle tableHandle) public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), true); + Optional
table = metastore.getTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName()); + + metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), false); + if (purgeDataOnDrop && table.isPresent()) { + String tableLocation = table.get().getStorage().getLocation(); + HdfsContext context = new HdfsContext(session, handle.getSchemaName(), handle.getTableName()); + deleteDirRecursive(context, hdfsEnvironment, new Path(tableLocation)); + } } @Override @@ -1074,6 +1084,14 @@ private Map> getMaterializedViewToken(ConnectorSess return viewToken; } + private void deleteDirRecursive(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path) { + try { + hdfsEnvironment.getFileSystem(context, path).delete(path, true); + } catch (IOException | RuntimeException e) { + log.error(e, "Failed to delete table data files in path: " + path.toString()); + } + } + private static class TableToken { // Current Snapshot ID of the table diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index 64fdfdb911ed4..7678f4ce8c0b3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -28,6 +28,7 @@ public class IcebergMetadataFactory private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; private final JsonCodec commitTaskCodec; + private final boolean purgeTableDataOnDrop; @Inject public IcebergMetadataFactory( @@ -37,23 +38,25 @@ public IcebergMetadataFactory( TypeManager typeManager, JsonCodec commitTaskDataJsonCodec) { - this(metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec); + this(metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, config.isPurgeDataOnTableDrop()); } public IcebergMetadataFactory( HiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, - JsonCodec commitTaskCodec) + JsonCodec commitTaskCodec, + boolean purgeTableData) { this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); + this.purgeTableDataOnDrop = purgeTableData; } public IcebergMetadata create() { - return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec); + return new IcebergMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, purgeTableDataOnDrop); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index ec8cd479f8974..634c2263bcd6f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -34,7 +34,8 @@ public void testDefaults() assertRecordedDefaults(recordDefaults(IcebergConfig.class) .setFileFormat(ORC) .setCompressionCodec(GZIP) - .setUseFileSizeFromMetadata(true)); + .setUseFileSizeFromMetadata(true) + .setPurgeDataOnTableDrop(false)); } @Test @@ -44,12 +45,14 @@ public void testExplicitPropertyMappings() .put("iceberg.file-format", "Parquet") .put("iceberg.compression-codec", "NONE") .put("iceberg.use-file-size-from-metadata", "false") + .put("iceberg.delete-files-on-table-drop", "true") .build(); IcebergConfig expected = new IcebergConfig() .setFileFormat(PARQUET) .setCompressionCodec(HiveCompressionCodec.NONE) - .setUseFileSizeFromMetadata(false); + .setUseFileSizeFromMetadata(false) + .setPurgeDataOnTableDrop(true); assertFullMapping(properties, expected); } From 520ebe3e910bf8e342ed3017b3f436bb102d751a Mon Sep 17 00:00:00 2001 From: serhiish Date: Wed, 27 Jan 2021 10:02:55 +0200 Subject: [PATCH 3/3] Conflicts resolved after renaming to Trino --- .../main/java/io/trino/plugin/iceberg/IcebergMetadata.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 69461ce02702d..2a94020337d3b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -1084,10 +1084,12 @@ private Map> getMaterializedViewToken(ConnectorSess return viewToken; } - private void deleteDirRecursive(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path) { + private void deleteDirRecursive(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path) + { try { hdfsEnvironment.getFileSystem(context, path).delete(path, true); - } catch (IOException | RuntimeException e) { + } + catch (IOException | RuntimeException e) { log.error(e, "Failed to delete table data files in path: " + path.toString()); } }