Skip to content

Commit

Permalink
Hive: close the fileIO client when closing the hive catalog (#10771)
Browse files Browse the repository at this point in the history
Co-authored-by: Amogh Jahagirdar <[email protected]>
Co-authored-by: Eduard Tudenhoefner <[email protected]>
  • Loading branch information
3 people authored Jul 25, 2024
1 parent 5da64e0 commit 7e2920a
Showing 1 changed file with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.iceberg.hive;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -79,6 +83,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
private Map<String, String> catalogProperties;
private Cache<TableOperations, FileIO> fileIOCloser;

public HiveCatalog() {}

Expand Down Expand Up @@ -111,6 +116,20 @@ public void initialize(String inputName, Map<String, String> properties) {
: CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

this.clients = new CachedClientPool(conf, properties);
this.fileIOCloser = newFileIOCloser();
}

private Cache<TableOperations, FileIO> newFileIOCloser() {
return Caffeine.newBuilder()
.weakKeys()
.removalListener(
(RemovalListener<TableOperations, FileIO>)
(ops, fileIOInstance, cause) -> {
if (null != fileIOInstance) {
fileIOInstance.close();
}
})
.build();
}

@Override
Expand Down Expand Up @@ -512,7 +531,10 @@ private boolean isValidateNamespace(Namespace namespace) {
public TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName);
HiveTableOperations ops =
new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName);
fileIOCloser.put(ops, ops.io());
return ops;
}

@Override
Expand Down Expand Up @@ -636,6 +658,15 @@ protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}

@Override
public void close() throws IOException {
super.close();
if (fileIOCloser != null) {
fileIOCloser.invalidateAll();
fileIOCloser.cleanUp();
}
}

@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
Expand Down

0 comments on commit 7e2920a

Please sign in to comment.