diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index 303e5cfa51..3b65aec33b 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -129,7 +129,7 @@ static List create( } catch (WakeupException ignore) { // Stop polling and being ready to clean up } finally { - Utils.swallowException(consumer::close); + Utils.close(consumer); closeLatch.countDown(); closed.set(true); CLIENT_ID_ASSIGNED_PARTITIONS.remove(clientId); diff --git a/app/src/main/java/org/astraea/app/performance/ProducerThread.java b/app/src/main/java/org/astraea/app/performance/ProducerThread.java index 170ce1e3c2..1218c99055 100644 --- a/app/src/main/java/org/astraea/app/performance/ProducerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ProducerThread.java @@ -118,7 +118,7 @@ static List create( throw new RuntimeException( e + ", The producer thread was prematurely closed."); } finally { - Utils.swallowException(producer::close); + Utils.close(producer); closeLatch.countDown(); closed.set(true); } diff --git a/app/src/main/java/org/astraea/app/performance/ReportFormat.java b/app/src/main/java/org/astraea/app/performance/ReportFormat.java index 9bd59e36b2..667ef0f000 100644 --- a/app/src/main/java/org/astraea/app/performance/ReportFormat.java +++ b/app/src/main/java/org/astraea/app/performance/ReportFormat.java @@ -89,7 +89,7 @@ public static Runnable createFileWriter( Utils.sleep(Duration.ofSeconds(1)); } } finally { - Utils.packException(writer::close); + Utils.close(writer); } }; case JSON: @@ -101,7 +101,7 @@ public static Runnable createFileWriter( Utils.sleep(Duration.ofSeconds(1)); } } finally { - Utils.packException(writer::close); + Utils.close(writer); } }; default: diff --git a/app/src/main/java/org/astraea/app/web/WebService.java b/app/src/main/java/org/astraea/app/web/WebService.java index e56316ce53..acbeddeb9a 100644 --- a/app/src/main/java/org/astraea/app/web/WebService.java +++ b/app/src/main/java/org/astraea/app/web/WebService.java @@ -98,7 +98,7 @@ public int port() { @Override public void close() { - Utils.swallowException(admin::close); + Utils.close(admin); server.stop(3); } diff --git a/common/src/main/java/org/astraea/common/Utils.java b/common/src/main/java/org/astraea/common/Utils.java index 6c36949762..8eb0edaa45 100644 --- a/common/src/main/java/org/astraea/common/Utils.java +++ b/common/src/main/java/org/astraea/common/Utils.java @@ -144,6 +144,12 @@ public static R packException(Getter getter) { } } + public static void close(Object obj) { + if (obj instanceof AutoCloseable) { + packException(() -> ((AutoCloseable) obj).close()); + } + } + /** * Convert the exception thrown by getter to RuntimeException. This method can eliminate the * exception from Java signature. diff --git a/common/src/main/java/org/astraea/common/csv/CsvReaderBuilder.java b/common/src/main/java/org/astraea/common/csv/CsvReaderBuilder.java index a1999c04b4..6cbfdb1969 100644 --- a/common/src/main/java/org/astraea/common/csv/CsvReaderBuilder.java +++ b/common/src/main/java/org/astraea/common/csv/CsvReaderBuilder.java @@ -120,7 +120,7 @@ public void skip(int num) { @Override public void close() { - Utils.packException(csvReader::close); + Utils.close(csvReader); } } } diff --git a/common/src/main/java/org/astraea/common/csv/CsvWriterBuilder.java b/common/src/main/java/org/astraea/common/csv/CsvWriterBuilder.java index b6e46fedbc..513a06ec53 100644 --- a/common/src/main/java/org/astraea/common/csv/CsvWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/csv/CsvWriterBuilder.java @@ -84,7 +84,7 @@ public void flush() { @Override public void close() { - Utils.packException(csvWriter::close); + Utils.close(csvWriter); } } } diff --git a/common/src/main/java/org/astraea/common/metrics/MBeanClient.java b/common/src/main/java/org/astraea/common/metrics/MBeanClient.java index a0e1fbcdd2..819ea2c053 100644 --- a/common/src/main/java/org/astraea/common/metrics/MBeanClient.java +++ b/common/src/main/java/org/astraea/common/metrics/MBeanClient.java @@ -124,7 +124,7 @@ static MBeanClient of(JMXServiceURL jmxServiceURL) { jmxServiceURL.getPort()) { @Override public void close() { - Utils.packException(jmxConnector::close); + Utils.close(jmxConnector); } }; }); diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java index f4d7816e6a..b0d4908122 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java @@ -258,7 +258,7 @@ private void updateMetadata() { } finally { lock.writeLock().unlock(); } - old.values().forEach(c -> Utils.swallowException(c::close)); + old.values().forEach(Utils::close); }); } @@ -292,7 +292,7 @@ public void close() { closed.set(true); executor.shutdownNow(); Utils.packException(() -> executor.awaitTermination(30, TimeUnit.SECONDS)); - clients.values().forEach(c -> Utils.swallowException(c::close)); + clients.values().forEach(Utils::close); sender.close(); } } diff --git a/common/src/main/java/org/astraea/common/partitioner/Partitioner.java b/common/src/main/java/org/astraea/common/partitioner/Partitioner.java index fb822697a2..c43d6b5a0d 100644 --- a/common/src/main/java/org/astraea/common/partitioner/Partitioner.java +++ b/common/src/main/java/org/astraea/common/partitioner/Partitioner.java @@ -60,7 +60,7 @@ protected void onNewBatch(String topic, int prevPartition, ClusterInfo clusterIn @Override public void close() { - if (admin != null) Utils.packException(admin::close); + Utils.close(admin); } // -----------------------[interdependent]-----------------------// diff --git a/common/src/test/java/org/astraea/common/UtilsTest.java b/common/src/test/java/org/astraea/common/UtilsTest.java index b785a49e9f..f8d8e7d8d8 100644 --- a/common/src/test/java/org/astraea/common/UtilsTest.java +++ b/common/src/test/java/org/astraea/common/UtilsTest.java @@ -16,6 +16,7 @@ */ package org.astraea.common; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -23,6 +24,7 @@ import java.util.SortedMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; import org.astraea.common.cost.CostFunction; @@ -296,4 +298,13 @@ void testCostFunctions() { .string("maxMigratedLeader") .get()); } + + @Test + void testClose() { + Assertions.assertDoesNotThrow(() -> Utils.close(null)); + var count = new AtomicInteger(); + Closeable obj = count::incrementAndGet; + Assertions.assertDoesNotThrow(() -> Utils.close(obj)); + Assertions.assertEquals(1, count.get()); + } } diff --git a/connector/src/main/java/org/astraea/connector/backup/Importer.java b/connector/src/main/java/org/astraea/connector/backup/Importer.java index c85bbc2fdc..7267aa1652 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Importer.java +++ b/connector/src/main/java/org/astraea/connector/backup/Importer.java @@ -181,7 +181,7 @@ var record = reader.next(); .headers(record.headers()) .build()); } - Utils.packException(inputStream::close); + Utils.close(inputStream); switch (cleanSource) { case "archive": var archiveInput = Client.read(currentPath); @@ -196,7 +196,7 @@ var record = archiveReader.next(); archiveWriter.append(record); } archiveWriter.close(); - Utils.packException(archiveInput::close); + Utils.close(archiveInput); case "delete": Client.delete(currentPath); case "off": diff --git a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java index 5aec508247..08973516ff 100644 --- a/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java @@ -144,6 +144,6 @@ public OutputStream write(String path) { @Override public void close() { - Utils.packException(fs::close); + Utils.close(fs); } }