diff --git a/api/src/main/resources/log4j.properties b/api/src/main/resources/log4j.properties index 7d76ef7e..94e13c55 100644 --- a/api/src/main/resources/log4j.properties +++ b/api/src/main/resources/log4j.properties @@ -2,7 +2,7 @@ log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n # Mute Undertow and its dependencies log4j.logger.io.undertow=WARN log4j.logger.org.jboss=WARN diff --git a/common/src/main/java/xyz/gianlu/librespot/common/AsyncProcessor.java b/common/src/main/java/xyz/gianlu/librespot/common/AsyncProcessor.java new file mode 100644 index 00000000..dbd7b7c6 --- /dev/null +++ b/common/src/main/java/xyz/gianlu/librespot/common/AsyncProcessor.java @@ -0,0 +1,57 @@ +package xyz.gianlu.librespot.common; + +import org.apache.log4j.Logger; +import org.jetbrains.annotations.NotNull; + +import java.io.Closeable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * Simple worker thread that processes tasks sequentially + * + * @param The type of task/input that AsyncProcessor handles. + * @param Return type of our processor implementation + */ +public class AsyncProcessor implements Closeable { + private static final Logger LOGGER = Logger.getLogger(AsyncProcessor.class); + private final String name; + private final Function processor; + private final ExecutorService executor; + + /** + * @param name name of async processor - used for thread name and logging + * @param processor actual processing implementation ran on background thread + */ + public AsyncProcessor(@NotNull String name, @NotNull Function processor) { + executor = Executors.newSingleThreadExecutor(new NameThreadFactory(r -> name)); + this.name = name; + this.processor = processor; + LOGGER.trace(String.format("AsyncProcessor{%s} has started", name)); + } + + public Future submit(@NotNull REQ task) { + return executor.submit(() -> processor.apply(task)); + } + + public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException { + if (!executor.isShutdown()) + throw new IllegalStateException(String.format("AsyncProcessor{%s} hasn't been shut down yet", name)); + + if (executor.awaitTermination(timeout, unit)) { + LOGGER.trace(String.format("AsyncProcessor{%s} is shut down", name)); + return true; + } else { + return false; + } + } + + @Override + public void close() { + LOGGER.trace(String.format("AsyncProcessor{%s} is shutting down", name)); + executor.shutdown(); + } +} diff --git a/common/src/main/java/xyz/gianlu/librespot/common/AsyncWorker.java b/common/src/main/java/xyz/gianlu/librespot/common/AsyncWorker.java index d2a4c142..33da689f 100644 --- a/common/src/main/java/xyz/gianlu/librespot/common/AsyncWorker.java +++ b/common/src/main/java/xyz/gianlu/librespot/common/AsyncWorker.java @@ -1,52 +1,38 @@ package xyz.gianlu.librespot.common; -import org.apache.log4j.Logger; import org.jetbrains.annotations.NotNull; import java.io.Closeable; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -public class AsyncWorker implements Closeable, Runnable { - private static final Logger LOGGER = Logger.getLogger(AsyncWorker.class); - private final Thread thread; - private final BlockingQueue internalQueue; - private final String name; - private final Consumer consumer; - private volatile boolean running = true; +/** + * Wrapper around AsyncProcessor that deals with void methods and does not expect a response type + * + * @param Task type for processor + */ +public class AsyncWorker implements Closeable { + private final AsyncProcessor underlyingProcessor; public AsyncWorker(@NotNull String name, @NotNull Consumer consumer) { - this.name = name; - this.consumer = consumer; - - internalQueue = new LinkedBlockingQueue<>(); - thread = new Thread(this, name); - thread.start(); + this.underlyingProcessor = new AsyncProcessor<>(name, t -> { + consumer.accept(t); + return null; + }); } - public void submit(@NotNull T task) { - internalQueue.add(task); + @NotNull + public Future submit(@NotNull T task) { + return underlyingProcessor.submit(task); } - @Override - public void close() { - running = false; - thread.interrupt(); + public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException { + return underlyingProcessor.awaitTermination(timeout, unit); } @Override - public void run() { - LOGGER.trace(String.format("AsyncWorker{%s} is starting", name)); - - while (running) { - try { - T polled = internalQueue.take(); - consumer.accept(polled); - } catch (InterruptedException ignored) { - } - } - - LOGGER.trace(String.format("AsyncWorker{%s} is shutting down", name)); + public void close() { + underlyingProcessor.close(); } } diff --git a/common/src/main/resources/log4j.properties b/common/src/main/resources/log4j.properties new file mode 100644 index 00000000..cca8bdca --- /dev/null +++ b/common/src/main/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=TRACE, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n \ No newline at end of file diff --git a/common/src/test/java/xyz/gianlu/librespot/common/AsyncProcessorTest.java b/common/src/test/java/xyz/gianlu/librespot/common/AsyncProcessorTest.java new file mode 100644 index 00000000..f0306f18 --- /dev/null +++ b/common/src/test/java/xyz/gianlu/librespot/common/AsyncProcessorTest.java @@ -0,0 +1,55 @@ +package xyz.gianlu.librespot.common; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncProcessorTest { + + @Test + void testAsyncProcessor() throws ExecutionException, InterruptedException { + AtomicInteger internalState = new AtomicInteger(); + AsyncProcessor asyncProcessor = new AsyncProcessor<>("test-processor-1", internalState::addAndGet); + asyncProcessor.submit(1); + asyncProcessor.submit(2); + asyncProcessor.submit(3); + Future lastTask = asyncProcessor.submit(4); + + Integer lastResult = lastTask.get(); // we only need to wait for the last one as tasks are executed in order + Assertions.assertEquals(10, internalState.get()); + Assertions.assertEquals(10, lastResult); + asyncProcessor.close(); + } + + @Test + void testAsyncProcessorExceptionHandling() { + AsyncProcessor asyncProcessor = new AsyncProcessor<>("test-processor-2", i -> { + throw new IllegalStateException(); + }); + + Future firstTask = asyncProcessor.submit(1); + Assertions.assertThrows(ExecutionException.class, firstTask::get); + + // now we check our loop didn't break and we are able to submit more tasks to our queue + Future secondTask = asyncProcessor.submit(1); + Assertions.assertThrows(ExecutionException.class, secondTask::get); + asyncProcessor.close(); + } + + @Test + void testAsyncProcessorFailAfterShutdown() throws ExecutionException, InterruptedException { + AtomicInteger internalState = new AtomicInteger(); + AsyncProcessor asyncProcessor = new AsyncProcessor<>("test-processor-3", internalState::addAndGet); + + Future taskBeforeShutdown = asyncProcessor.submit(1); + Assertions.assertEquals(1, taskBeforeShutdown.get()); + + asyncProcessor.close(); + + Assertions.assertThrows(RejectedExecutionException.class, () -> asyncProcessor.submit(1)); + } +} diff --git a/core/src/main/resources/log4j.properties b/core/src/main/resources/log4j.properties index 7091d55b..cca8bdca 100644 --- a/core/src/main/resources/log4j.properties +++ b/core/src/main/resources/log4j.properties @@ -2,4 +2,4 @@ log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n \ No newline at end of file +log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n \ No newline at end of file