Skip to content

Commit

Permalink
[COMMON] add Utils#close (#1678)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Apr 27, 2023
1 parent 5e758ed commit c9219bf
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ static List<ConsumerThread> create(
} catch (WakeupException ignore) {
// Stop polling and being ready to clean up
} finally {
Utils.swallowException(consumer::close);
Utils.close(consumer);
closeLatch.countDown();
closed.set(true);
CLIENT_ID_ASSIGNED_PARTITIONS.remove(clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ static List<ProducerThread> create(
throw new RuntimeException(
e + ", The producer thread was prematurely closed.");
} finally {
Utils.swallowException(producer::close);
Utils.close(producer);
closeLatch.countDown();
closed.set(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static Runnable createFileWriter(
Utils.sleep(Duration.ofSeconds(1));
}
} finally {
Utils.packException(writer::close);
Utils.close(writer);
}
};
case JSON:
Expand All @@ -101,7 +101,7 @@ public static Runnable createFileWriter(
Utils.sleep(Duration.ofSeconds(1));
}
} finally {
Utils.packException(writer::close);
Utils.close(writer);
}
};
default:
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/WebService.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public int port() {

@Override
public void close() {
Utils.swallowException(admin::close);
Utils.close(admin);
server.stop(3);
}

Expand Down
6 changes: 6 additions & 0 deletions common/src/main/java/org/astraea/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ public static <R> R packException(Getter<R> getter) {
}
}

public static void close(Object obj) {
if (obj instanceof AutoCloseable) {
packException(() -> ((AutoCloseable) obj).close());
}
}

/**
* Convert the exception thrown by getter to RuntimeException. This method can eliminate the
* exception from Java signature.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void skip(int num) {

@Override
public void close() {
Utils.packException(csvReader::close);
Utils.close(csvReader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void flush() {

@Override
public void close() {
Utils.packException(csvWriter::close);
Utils.close(csvWriter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static MBeanClient of(JMXServiceURL jmxServiceURL) {
jmxServiceURL.getPort()) {
@Override
public void close() {
Utils.packException(jmxConnector::close);
Utils.close(jmxConnector);
}
};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private void updateMetadata() {
} finally {
lock.writeLock().unlock();
}
old.values().forEach(c -> Utils.swallowException(c::close));
old.values().forEach(Utils::close);
});
}

Expand Down Expand Up @@ -292,7 +292,7 @@ public void close() {
closed.set(true);
executor.shutdownNow();
Utils.packException(() -> executor.awaitTermination(30, TimeUnit.SECONDS));
clients.values().forEach(c -> Utils.swallowException(c::close));
clients.values().forEach(Utils::close);
sender.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected void onNewBatch(String topic, int prevPartition, ClusterInfo clusterIn

@Override
public void close() {
if (admin != null) Utils.packException(admin::close);
Utils.close(admin);
}

// -----------------------[interdependent]-----------------------//
Expand Down
11 changes: 11 additions & 0 deletions common/src/test/java/org/astraea/common/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package org.astraea.common;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.cost.CostFunction;
Expand Down Expand Up @@ -296,4 +298,13 @@ void testCostFunctions() {
.string("maxMigratedLeader")
.get());
}

@Test
void testClose() {
Assertions.assertDoesNotThrow(() -> Utils.close(null));
var count = new AtomicInteger();
Closeable obj = count::incrementAndGet;
Assertions.assertDoesNotThrow(() -> Utils.close(obj));
Assertions.assertEquals(1, count.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ var record = reader.next();
.headers(record.headers())
.build());
}
Utils.packException(inputStream::close);
Utils.close(inputStream);
switch (cleanSource) {
case "archive":
var archiveInput = Client.read(currentPath);
Expand All @@ -196,7 +196,7 @@ var record = archiveReader.next();
archiveWriter.append(record);
}
archiveWriter.close();
Utils.packException(archiveInput::close);
Utils.close(archiveInput);
case "delete":
Client.delete(currentPath);
case "off":
Expand Down
2 changes: 1 addition & 1 deletion fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,6 @@ public OutputStream write(String path) {

@Override
public void close() {
Utils.packException(fs::close);
Utils.close(fs);
}
}

0 comments on commit c9219bf

Please sign in to comment.