Skip to content

Commit

Permalink
AWS, Core, Hive: Extract FileIO closer into separate class
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Aug 7, 2024
1 parent 257b1d7 commit 332a245
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 69 deletions.
31 changes: 6 additions & 25 deletions aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.iceberg.aws.glue;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,7 +48,7 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOCloser;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
Expand Down Expand Up @@ -97,7 +94,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
private LockManager lockManager;
private CloseableGroup closeableGroup;
private Map<String, String> catalogProperties;
private Cache<TableOperations, FileIO> fileIOCloser;
private FileIOCloser fileIOCloser;

// Attempt to set versionId if available on the path
private static final DynMethods.UnboundMethod SET_VERSION_ID =
Expand Down Expand Up @@ -194,11 +191,12 @@ void initialize(
this.lockManager = lock;

this.closeableGroup = new CloseableGroup();
this.fileIOCloser = new FileIOCloser();
closeableGroup.addCloseable(glue);
closeableGroup.addCloseable(lockManager);
closeableGroup.addCloseable(metricsReporter());
closeableGroup.addCloseable(fileIOCloser);
closeableGroup.setSuppressCloseFailure(true);
this.fileIOCloser = newFileIOCloser();
}

@Override
Expand Down Expand Up @@ -243,7 +241,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
tableSpecificCatalogPropertiesBuilder.buildOrThrow(),
hadoopConf,
tableIdentifier);
fileIOCloser.put(glueTableOperations, glueTableOperations.io());
fileIOCloser.trackFileIO(glueTableOperations);
return glueTableOperations;
}

Expand All @@ -256,7 +254,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
catalogProperties,
hadoopConf,
tableIdentifier);
fileIOCloser.put(glueTableOperations, glueTableOperations.io());
fileIOCloser.trackFileIO(glueTableOperations);
return glueTableOperations;
}

Expand Down Expand Up @@ -634,10 +632,6 @@ public String name() {
@Override
public void close() throws IOException {
closeableGroup.close();
if (fileIOCloser != null) {
fileIOCloser.invalidateAll();
fileIOCloser.cleanUp();
}
}

@Override
Expand All @@ -649,17 +643,4 @@ public void setConf(Configuration conf) {
protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}

private Cache<TableOperations, FileIO> newFileIOCloser() {
return Caffeine.newBuilder()
.weakKeys()
.removalListener(
(RemovalListener<TableOperations, FileIO>)
(ops, fileIO, cause) -> {
if (null != fileIO) {
fileIO.close();
}
})
.build();
}
}
59 changes: 59 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/FileIOCloser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.io;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.Closeable;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* Keeps track of the {@link FileIO} instance of the given {@link TableOperations} instance and
* closes the {@link FileIO} when {@link FileIOCloser#close()} gets called
*/
public class FileIOCloser implements Closeable {
private final Cache<TableOperations, FileIO> closer;

public FileIOCloser() {
this.closer =
Caffeine.newBuilder()
.weakKeys()
.removalListener(
(RemovalListener<TableOperations, FileIO>)
(ops, fileIO, cause) -> {
if (null != fileIO) {
fileIO.close();
}
})
.build();
}

public void trackFileIO(TableOperations ops) {
Preconditions.checkArgument(null != ops, "Invalid table ops: null");
closer.put(ops, ops.io());
}

@Override
public void close() {
closer.invalidateAll();
closer.cleanUp();
}
}
27 changes: 5 additions & 22 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.BaseViewSessionCatalog;
Expand All @@ -63,6 +62,7 @@
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOCloser;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.MetricsReporters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -136,7 +136,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private final BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder;
private Cache<String, AuthSession> sessions = null;
private Cache<String, AuthSession> tableSessions = null;
private Cache<TableOperations, FileIO> fileIOCloser;
private FileIOCloser fileIOCloser = null;
private AuthSession catalogAuth = null;
private boolean keepTokenRefreshed = true;
private RESTClient client = null;
Expand Down Expand Up @@ -268,10 +268,11 @@ public void initialize(String name, Map<String, String> unresolved) {

this.io = newFileIO(SessionContext.createEmpty(), mergedProps);

this.fileIOCloser = newFileIOCloser();
this.fileIOCloser = new FileIOCloser();
this.closeables = new CloseableGroup();
this.closeables.addCloseable(this.io);
this.closeables.addCloseable(this.client);
this.closeables.addCloseable(fileIOCloser);
this.closeables.setSuppressCloseFailure(true);

this.snapshotMode =
Expand Down Expand Up @@ -465,7 +466,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {

private void trackFileIO(RESTTableOperations ops) {
if (io != ops.io()) {
fileIOCloser.put(ops, ops.io());
fileIOCloser.trackFileIO(ops);
}
}

Expand Down Expand Up @@ -641,11 +642,6 @@ public void close() throws IOException {
if (closeables != null) {
closeables.close();
}

if (fileIOCloser != null) {
fileIOCloser.invalidateAll();
fileIOCloser.cleanUp();
}
}

private void shutdownRefreshExecutor() {
Expand Down Expand Up @@ -1088,19 +1084,6 @@ private static Cache<String, AuthSession> newSessionCache(Map<String, String> pr
.build();
}

private Cache<TableOperations, FileIO> newFileIOCloser() {
return Caffeine.newBuilder()
.weakKeys()
.removalListener(
(RemovalListener<TableOperations, FileIO>)
(ops, fileIO, cause) -> {
if (null != fileIO) {
fileIO.close();
}
})
.build();
}

public void commitTransaction(SessionContext context, List<TableCommit> commits) {
List<UpdateTableRequest> tableChanges = Lists.newArrayListWithCapacity(commits.size());

Expand Down
80 changes: 80 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFileIOCloser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.times;

import com.github.benmanes.caffeine.cache.Cache;
import java.io.File;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOCloser;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

public class TestFileIOCloser {

@TempDir private File tableDir;

@SuppressWarnings("resource")
@Test
public void nullTableOps() {
assertThatThrownBy(() -> new FileIOCloser().trackFileIO(null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid table ops: null");
}

@SuppressWarnings("unchecked")
@Test
public void fileIOGetsClosed() throws NoSuchFieldException, IllegalAccessException {
FileIOCloser fileIOCloser = new FileIOCloser();
Field declaredField = fileIOCloser.getClass().getDeclaredField("closer");
declaredField.setAccessible(true);
assertThat(declaredField.get(fileIOCloser)).isInstanceOf(Cache.class);
Cache<TableOperations, FileIO> closer =
((Cache<TableOperations, FileIO>) declaredField.get(fileIOCloser));

FileIO firstFileIO = Mockito.spy(new TestTables.LocalFileIO());
TestTables.TestTableOperations firstOps =
new TestTables.TestTableOperations("x", tableDir, firstFileIO);
fileIOCloser.trackFileIO(firstOps);
assertThat(closer.estimatedSize()).isEqualTo(1);

FileIO secondFileIO = Mockito.spy(new TestTables.LocalFileIO());
TestTables.TestTableOperations secondOps =
new TestTables.TestTableOperations("y", tableDir, secondFileIO);
fileIOCloser.trackFileIO(secondOps);
assertThat(closer.estimatedSize()).isEqualTo(2);

fileIOCloser.close();
Awaitility.await("FileIO gets closed")
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> {
assertThat(closer.estimatedSize()).isEqualTo(0);
Mockito.verify(firstFileIO, times(1)).close();
Mockito.verify(secondFileIO, times(1)).close();
});
}
}
19 changes: 18 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,30 @@ public static class TestTableOperations implements TableOperations {

private final String tableName;
private final File metadata;
private final FileIO fileIO;
private TableMetadata current = null;
private long lastSnapshotId = 0;
private int failCommits = 0;

public TestTableOperations(String tableName, File location) {
this.tableName = tableName;
this.metadata = new File(location, "metadata");
this.fileIO = new LocalFileIO();
metadata.mkdirs();
refresh();
if (current != null) {
for (Snapshot snap : current.snapshots()) {
this.lastSnapshotId = Math.max(lastSnapshotId, snap.snapshotId());
}
} else {
this.lastSnapshotId = 0;
}
}

public TestTableOperations(String tableName, File location, FileIO fileIO) {
this.tableName = tableName;
this.metadata = new File(location, "metadata");
this.fileIO = fileIO;
metadata.mkdirs();
refresh();
if (current != null) {
Expand Down Expand Up @@ -277,7 +294,7 @@ public void commit(TableMetadata base, TableMetadata updatedMetadata) {

@Override
public FileIO io() {
return new LocalFileIO();
return fileIO;
}

@Override
Expand Down
Loading

0 comments on commit 332a245

Please sign in to comment.