Skip to content

Commit

Permalink
improve shutdown impl to work on java 17+ (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwinckles authored Jul 21, 2022
1 parent 9922628 commit 121f78a
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<jena.patch.version>4.7.2</jena.patch.version>
<junit.version>4.13.1</junit.version>
<logback.version>1.2.0</logback.version>
<mockito.version>1.10.8</mockito.version>
<mockito.version>4.6.1</mockito.version>
<shade.plugin.version>2.4.3</shade.plugin.version>
<slf4j.version>1.7.10</slf4j.version>
<cargo.version>1.4.12</cargo.version>
Expand Down
45 changes: 28 additions & 17 deletions src/main/java/org/fcrepo/importexport/exporter/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand All @@ -74,12 +73,12 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -778,7 +777,6 @@ private InputStream wrap(final InputStream in, final MessageDigest digest) {
private class TaskManager {

private final ExecutorService executorService;
private final BlockingQueue<Runnable> workQueue;
private final AtomicLong count;
private final Object lock;
private boolean shutdown = false;
Expand All @@ -794,10 +792,14 @@ public TaskManager(final Integer threadCount) {

logger.info("Using {} threads to export resources", threads);

this.workQueue = new LinkedBlockingQueue<>();
this.executorService = new ThreadPoolExecutor(threads, threads,
0L, TimeUnit.MILLISECONDS,
workQueue);
new LinkedBlockingQueue<>()) {
@Override
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
return new FutureTaskWithCallable<>(callable);
}
};
this.count = new AtomicLong(0);
this.lock = new Object();

Expand Down Expand Up @@ -875,12 +877,11 @@ public synchronized void shutdown() {
shutdown = true;

try {
executorService.shutdown();
drainTasks();
final List<Runnable> remaining = executorService.shutdownNow();
logRemainingTasks(remaining);
logger.info("Waiting for inflight tasks to complete...");
if (!executorService.awaitTermination(5, TimeUnit.MINUTES)) {
logger.warn("Failed to shutdown executor service cleanly after 5 minutes of waiting");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
logger.warn("Failed to shutdown executor service cleanly");
Expand All @@ -891,25 +892,35 @@ public synchronized void shutdown() {
}

/**
* Empties the queue of unprocessed tasks and adds them to the remaining log
* Adds remaining tasks to log
*/
private void drainTasks() {
final List<Runnable> remaining = new ArrayList<>(workQueue.size());
workQueue.drainTo(remaining);
@SuppressWarnings("unchecked")
private void logRemainingTasks(final List<Runnable> remaining) {
remaining.forEach(task -> {
try {
// Dirty hack to get original callable
final Field field = FutureTask.class.getDeclaredField("callable");
field.setAccessible(true);
final ExportTask inner = (ExportTask) field.get(task);
remainingLogger.error("{}", inner.uri);
final ExportTask callable = (ExportTask) ((FutureTaskWithCallable<Void>) task).callable;
remainingLogger.error("{}", callable.uri);
} catch (Exception e) {
logger.warn("Failed to extract unprocessed resource URI", e);
}
});
}
}

/**
* FutureTask extension that supports accessing the wrapped Callable
*/
private static class FutureTaskWithCallable<V> extends FutureTask<V> {

private final Callable<V> callable;

public FutureTaskWithCallable(final Callable<V> callable) {
super(callable);
this.callable = callable;
}

}

private static class ExportTask implements Callable<Void> {
private final URI uri;
private final Runnable runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import static org.fcrepo.importexport.common.FcrepoConstants.TIMEMAP;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.isA;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import static org.fcrepo.importexport.common.FcrepoConstants.REPOSITORY_ROOT;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.fcrepo.importexport.test.util;

import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down

0 comments on commit 121f78a

Please sign in to comment.