From 332a245ec76d8bf26e3b6a1ca08ea213c7ed867b Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 7 Aug 2024 10:36:55 +0200 Subject: [PATCH] AWS, Core, Hive: Extract FileIO closer into separate class --- .../apache/iceberg/aws/glue/GlueCatalog.java | 31 ++----- .../org/apache/iceberg/io/FileIOCloser.java | 59 ++++++++++++++ .../iceberg/rest/RESTSessionCatalog.java | 27 ++----- .../org/apache/iceberg/TestFileIOCloser.java | 80 +++++++++++++++++++ .../java/org/apache/iceberg/TestTables.java | 19 ++++- .../org/apache/iceberg/hive/HiveCatalog.java | 26 ++---- 6 files changed, 173 insertions(+), 69 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/io/FileIOCloser.java create mode 100644 core/src/test/java/org/apache/iceberg/TestFileIOCloser.java diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index c6b157bb5c79..c441eedfa5b8 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -18,9 +18,6 @@ */ package org.apache.iceberg.aws.glue; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; import java.util.List; import java.util.Map; @@ -51,7 +48,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.CloseableGroup; -import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOCloser; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -97,7 +94,7 @@ public class GlueCatalog extends BaseMetastoreCatalog private LockManager lockManager; private CloseableGroup closeableGroup; private Map catalogProperties; - private Cache fileIOCloser; + private FileIOCloser fileIOCloser; // Attempt to set versionId if available on the path private static final DynMethods.UnboundMethod SET_VERSION_ID = @@ -194,11 +191,12 @@ void initialize( this.lockManager = lock; this.closeableGroup = new CloseableGroup(); + this.fileIOCloser = new FileIOCloser(); closeableGroup.addCloseable(glue); closeableGroup.addCloseable(lockManager); closeableGroup.addCloseable(metricsReporter()); + closeableGroup.addCloseable(fileIOCloser); closeableGroup.setSuppressCloseFailure(true); - this.fileIOCloser = newFileIOCloser(); } @Override @@ -243,7 +241,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { tableSpecificCatalogPropertiesBuilder.buildOrThrow(), hadoopConf, tableIdentifier); - fileIOCloser.put(glueTableOperations, glueTableOperations.io()); + fileIOCloser.trackFileIO(glueTableOperations); return glueTableOperations; } @@ -256,7 +254,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { catalogProperties, hadoopConf, tableIdentifier); - fileIOCloser.put(glueTableOperations, glueTableOperations.io()); + fileIOCloser.trackFileIO(glueTableOperations); return glueTableOperations; } @@ -634,10 +632,6 @@ public String name() { @Override public void close() throws IOException { closeableGroup.close(); - if (fileIOCloser != null) { - fileIOCloser.invalidateAll(); - fileIOCloser.cleanUp(); - } } @Override @@ -649,17 +643,4 @@ public void setConf(Configuration conf) { protected Map properties() { return catalogProperties == null ? ImmutableMap.of() : catalogProperties; } - - private Cache newFileIOCloser() { - return Caffeine.newBuilder() - .weakKeys() - .removalListener( - (RemovalListener) - (ops, fileIO, cause) -> { - if (null != fileIO) { - fileIO.close(); - } - }) - .build(); - } } diff --git a/core/src/main/java/org/apache/iceberg/io/FileIOCloser.java b/core/src/main/java/org/apache/iceberg/io/FileIOCloser.java new file mode 100644 index 000000000000..105fc10e8c0e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FileIOCloser.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.Closeable; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Keeps track of the {@link FileIO} instance of the given {@link TableOperations} instance and + * closes the {@link FileIO} when {@link FileIOCloser#close()} gets called + */ +public class FileIOCloser implements Closeable { + private final Cache closer; + + public FileIOCloser() { + this.closer = + Caffeine.newBuilder() + .weakKeys() + .removalListener( + (RemovalListener) + (ops, fileIO, cause) -> { + if (null != fileIO) { + fileIO.close(); + } + }) + .build(); + } + + public void trackFileIO(TableOperations ops) { + Preconditions.checkArgument(null != ops, "Invalid table ops: null"); + closer.put(ops, ops.io()); + } + + @Override + public void close() { + closer.invalidateAll(); + closer.cleanUp(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 1c607e3b0220..e6cc52624292 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -47,7 +47,6 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.BaseViewSessionCatalog; @@ -63,6 +62,7 @@ import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOCloser; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.metrics.MetricsReporters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -136,7 +136,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private final BiFunction, FileIO> ioBuilder; private Cache sessions = null; private Cache tableSessions = null; - private Cache fileIOCloser; + private FileIOCloser fileIOCloser = null; private AuthSession catalogAuth = null; private boolean keepTokenRefreshed = true; private RESTClient client = null; @@ -268,10 +268,11 @@ public void initialize(String name, Map unresolved) { this.io = newFileIO(SessionContext.createEmpty(), mergedProps); - this.fileIOCloser = newFileIOCloser(); + this.fileIOCloser = new FileIOCloser(); this.closeables = new CloseableGroup(); this.closeables.addCloseable(this.io); this.closeables.addCloseable(this.client); + this.closeables.addCloseable(fileIOCloser); this.closeables.setSuppressCloseFailure(true); this.snapshotMode = @@ -465,7 +466,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { private void trackFileIO(RESTTableOperations ops) { if (io != ops.io()) { - fileIOCloser.put(ops, ops.io()); + fileIOCloser.trackFileIO(ops); } } @@ -641,11 +642,6 @@ public void close() throws IOException { if (closeables != null) { closeables.close(); } - - if (fileIOCloser != null) { - fileIOCloser.invalidateAll(); - fileIOCloser.cleanUp(); - } } private void shutdownRefreshExecutor() { @@ -1088,19 +1084,6 @@ private static Cache newSessionCache(Map pr .build(); } - private Cache newFileIOCloser() { - return Caffeine.newBuilder() - .weakKeys() - .removalListener( - (RemovalListener) - (ops, fileIO, cause) -> { - if (null != fileIO) { - fileIO.close(); - } - }) - .build(); - } - public void commitTransaction(SessionContext context, List commits) { List tableChanges = Lists.newArrayListWithCapacity(commits.size()); diff --git a/core/src/test/java/org/apache/iceberg/TestFileIOCloser.java b/core/src/test/java/org/apache/iceberg/TestFileIOCloser.java new file mode 100644 index 000000000000..85ff1ff3d427 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestFileIOCloser.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.times; + +import com.github.benmanes.caffeine.cache.Cache; +import java.io.File; +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOCloser; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; + +public class TestFileIOCloser { + + @TempDir private File tableDir; + + @SuppressWarnings("resource") + @Test + public void nullTableOps() { + assertThatThrownBy(() -> new FileIOCloser().trackFileIO(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid table ops: null"); + } + + @SuppressWarnings("unchecked") + @Test + public void fileIOGetsClosed() throws NoSuchFieldException, IllegalAccessException { + FileIOCloser fileIOCloser = new FileIOCloser(); + Field declaredField = fileIOCloser.getClass().getDeclaredField("closer"); + declaredField.setAccessible(true); + assertThat(declaredField.get(fileIOCloser)).isInstanceOf(Cache.class); + Cache closer = + ((Cache) declaredField.get(fileIOCloser)); + + FileIO firstFileIO = Mockito.spy(new TestTables.LocalFileIO()); + TestTables.TestTableOperations firstOps = + new TestTables.TestTableOperations("x", tableDir, firstFileIO); + fileIOCloser.trackFileIO(firstOps); + assertThat(closer.estimatedSize()).isEqualTo(1); + + FileIO secondFileIO = Mockito.spy(new TestTables.LocalFileIO()); + TestTables.TestTableOperations secondOps = + new TestTables.TestTableOperations("y", tableDir, secondFileIO); + fileIOCloser.trackFileIO(secondOps); + assertThat(closer.estimatedSize()).isEqualTo(2); + + fileIOCloser.close(); + Awaitility.await("FileIO gets closed") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted( + () -> { + assertThat(closer.estimatedSize()).isEqualTo(0); + Mockito.verify(firstFileIO, times(1)).close(); + Mockito.verify(secondFileIO, times(1)).close(); + }); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index de05e85c3c77..7c2f9eb862d3 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -216,6 +216,7 @@ public static class TestTableOperations implements TableOperations { private final String tableName; private final File metadata; + private final FileIO fileIO; private TableMetadata current = null; private long lastSnapshotId = 0; private int failCommits = 0; @@ -223,6 +224,22 @@ public static class TestTableOperations implements TableOperations { public TestTableOperations(String tableName, File location) { this.tableName = tableName; this.metadata = new File(location, "metadata"); + this.fileIO = new LocalFileIO(); + metadata.mkdirs(); + refresh(); + if (current != null) { + for (Snapshot snap : current.snapshots()) { + this.lastSnapshotId = Math.max(lastSnapshotId, snap.snapshotId()); + } + } else { + this.lastSnapshotId = 0; + } + } + + public TestTableOperations(String tableName, File location, FileIO fileIO) { + this.tableName = tableName; + this.metadata = new File(location, "metadata"); + this.fileIO = fileIO; metadata.mkdirs(); refresh(); if (current != null) { @@ -277,7 +294,7 @@ public void commit(TableMetadata base, TableMetadata updatedMetadata) { @Override public FileIO io() { - return new LocalFileIO(); + return fileIO; } @Override diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 8944cf93947b..d628dcaca709 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -18,9 +18,6 @@ */ package org.apache.iceberg.hive; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; import java.util.List; import java.util.Map; @@ -53,6 +50,7 @@ import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOCloser; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -83,7 +81,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa private ClientPool clients; private boolean listAllTables = false; private Map catalogProperties; - private Cache fileIOCloser; + private FileIOCloser fileIOCloser; public HiveCatalog() {} @@ -116,20 +114,7 @@ public void initialize(String inputName, Map properties) { : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); this.clients = new CachedClientPool(conf, properties); - this.fileIOCloser = newFileIOCloser(); - } - - private Cache newFileIOCloser() { - return Caffeine.newBuilder() - .weakKeys() - .removalListener( - (RemovalListener) - (ops, fileIOInstance, cause) -> { - if (null != fileIOInstance) { - fileIOInstance.close(); - } - }) - .build(); + this.fileIOCloser = new FileIOCloser(); } @Override @@ -533,7 +518,7 @@ public TableOperations newTableOps(TableIdentifier tableIdentifier) { String tableName = tableIdentifier.name(); HiveTableOperations ops = new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); - fileIOCloser.put(ops, ops.io()); + fileIOCloser.trackFileIO(ops); return ops; } @@ -662,8 +647,7 @@ protected Map properties() { public void close() throws IOException { super.close(); if (fileIOCloser != null) { - fileIOCloser.invalidateAll(); - fileIOCloser.cleanUp(); + fileIOCloser.close(); } }