diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index b4f49e29fc49..8944cf93947b 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -18,6 +18,10 @@ */ package org.apache.iceberg.hive; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -79,6 +83,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa private ClientPool clients; private boolean listAllTables = false; private Map catalogProperties; + private Cache fileIOCloser; public HiveCatalog() {} @@ -111,6 +116,20 @@ public void initialize(String inputName, Map properties) { : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); this.clients = new CachedClientPool(conf, properties); + this.fileIOCloser = newFileIOCloser(); + } + + private Cache newFileIOCloser() { + return Caffeine.newBuilder() + .weakKeys() + .removalListener( + (RemovalListener) + (ops, fileIOInstance, cause) -> { + if (null != fileIOInstance) { + fileIOInstance.close(); + } + }) + .build(); } @Override @@ -512,7 +531,10 @@ private boolean isValidateNamespace(Namespace namespace) { public TableOperations newTableOps(TableIdentifier tableIdentifier) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); - return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); + HiveTableOperations ops = + new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); + fileIOCloser.put(ops, ops.io()); + return ops; } @Override @@ -636,6 +658,15 @@ protected Map properties() { return catalogProperties == null ? ImmutableMap.of() : catalogProperties; } + @Override + public void close() throws IOException { + super.close(); + if (fileIOCloser != null) { + fileIOCloser.invalidateAll(); + fileIOCloser.cleanUp(); + } + } + @VisibleForTesting void setListAllTables(boolean listAllTables) { this.listAllTables = listAllTables;