Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive: close the fileIO client when closing the hive catalog #10771

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.iceberg.hive;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -79,6 +83,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa
private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
private Map<String, String> catalogProperties;
private Cache<TableOperations, FileIO> fileIOCloser;

public HiveCatalog() {}

Expand Down Expand Up @@ -111,6 +116,20 @@ public void initialize(String inputName, Map<String, String> properties) {
: CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

this.clients = new CachedClientPool(conf, properties);
this.fileIOCloser = newFileIOCloser();
}

private Cache<TableOperations, FileIO> newFileIOCloser() {
return Caffeine.newBuilder()
.weakKeys()
.removalListener(
(RemovalListener<TableOperations, FileIO>)
(ops, fileIOInstance, cause) -> {
if (null != fileIOInstance) {
fileIOInstance.close();
}
})
.build();
}

@Override
Expand Down Expand Up @@ -512,7 +531,10 @@ private boolean isValidateNamespace(Namespace namespace) {
public TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName);
HiveTableOperations ops =
new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName);
fileIOCloser.put(ops, ops.io());
return ops;
amogh-jahagirdar marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -636,6 +658,15 @@ protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}

@Override
public void close() throws IOException {
super.close();
if (fileIOCloser != null) {
fileIOCloser.invalidateAll();
fileIOCloser.cleanUp();
}
}

@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
Expand Down
Loading