From 7e2920af400e5d559f8f1742b1043787b0477d74 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 25 Jul 2024 19:50:19 +0200 Subject: [PATCH] Hive: close the fileIO client when closing the hive catalog (#10771) Co-authored-by: Amogh Jahagirdar Co-authored-by: Eduard Tudenhoefner --- .../org/apache/iceberg/hive/HiveCatalog.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index b4f49e29fc49..8944cf93947b 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -18,6 +18,10 @@ */ package org.apache.iceberg.hive; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -79,6 +83,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa private ClientPool clients; private boolean listAllTables = false; private Map catalogProperties; + private Cache fileIOCloser; public HiveCatalog() {} @@ -111,6 +116,20 @@ public void initialize(String inputName, Map properties) { : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); this.clients = new CachedClientPool(conf, properties); + this.fileIOCloser = newFileIOCloser(); + } + + private Cache newFileIOCloser() { + return Caffeine.newBuilder() + .weakKeys() + .removalListener( + (RemovalListener) + (ops, fileIOInstance, cause) -> { + if (null != fileIOInstance) { + fileIOInstance.close(); + } + }) + .build(); } @Override @@ -512,7 +531,10 @@ private boolean isValidateNamespace(Namespace namespace) { public TableOperations newTableOps(TableIdentifier tableIdentifier) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); - return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); + HiveTableOperations ops = + new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); + fileIOCloser.put(ops, ops.io()); + return ops; } @Override @@ -636,6 +658,15 @@ protected Map properties() { return catalogProperties == null ? ImmutableMap.of() : catalogProperties; } + @Override + public void close() throws IOException { + super.close(); + if (fileIOCloser != null) { + fileIOCloser.invalidateAll(); + fileIOCloser.cleanUp(); + } + } + @VisibleForTesting void setListAllTables(boolean listAllTables) { this.listAllTables = listAllTables;