Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated AsyncWorker (and defined AsyncProcessor) so that we get back … #212

Merged
merged 4 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ log4j.rootLogger=TRACE, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n
devgianlu marked this conversation as resolved.
Show resolved Hide resolved
# Mute Undertow and its dependencies
log4j.logger.io.undertow=WARN
log4j.logger.org.jboss=WARN
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package xyz.gianlu.librespot.common;

import org.apache.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.io.Closeable;
import java.util.concurrent.*;
import java.util.function.Function;

/**
* Simple worker thread that processes tasks sequentially
* @param <REQ> The type of task/input that AsyncProcessor handles.
* @param <RES> Return type of our processor implementation
*/
public class AsyncProcessor<REQ, RES> implements Closeable {

private static final Logger LOGGER = Logger.getLogger(AsyncProcessor.class);
private final String name;
private final Function<REQ, RES> processor;
private final ExecutorService executor;
private volatile boolean running;

/**
* @param name name of async processor - used for thread name and logging
* @param processor actual processing implementation ran on background thread
*/
public AsyncProcessor(@NotNull String name, @NotNull Function<REQ, RES> processor) {
executor = Executors.newSingleThreadExecutor(new NameThreadFactory(r -> name));
this.name = name;
this.processor = processor;
running = true;
LOGGER.trace(String.format("AsyncProcessor %s has started", name));
}

public Future<RES> submit(@NotNull REQ task) {
if (!running) {
throw new IllegalStateException(String.format("AsyncProcessor %s has already been shutdown", name));
}
return executor.submit(() -> processor.apply(task));
}

@Override
public void close() {
running = false;
LOGGER.trace(String.format("AsyncProcessor %s is shutting down", name));
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOGGER.trace(String.format("AsyncProcessor %s was interrupted during shutdown", name));
}
LOGGER.trace(String.format("AsyncProcessor %s is shut down", name));
}

}
51 changes: 16 additions & 35 deletions common/src/main/java/xyz/gianlu/librespot/common/AsyncWorker.java
Original file line number Diff line number Diff line change
@@ -1,52 +1,33 @@
package xyz.gianlu.librespot.common;

import org.apache.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Future;
import java.util.function.Consumer;

public class AsyncWorker<T> implements Closeable, Runnable {
private static final Logger LOGGER = Logger.getLogger(AsyncWorker.class);
private final Thread thread;
private final BlockingQueue<T> internalQueue;
private final String name;
private final Consumer<T> consumer;
private volatile boolean running = true;
/**
* Wrapper around AsyncProcessor that deals with void methods and does not expect a response type
*
* @param <T> Task type for processor
*/
public class AsyncWorker<T> implements Closeable {

public AsyncWorker(@NotNull String name, @NotNull Consumer<T> consumer) {
this.name = name;
this.consumer = consumer;
private final AsyncProcessor<T, Void> underlyingProcessor;

internalQueue = new LinkedBlockingQueue<>();
thread = new Thread(this, name);
thread.start();
public AsyncWorker(@NotNull String name, @NotNull Consumer<T> consumer) {
this.underlyingProcessor = new AsyncProcessor<>(name, t -> {
consumer.accept(t);
return null;
});
}

public void submit(@NotNull T task) {
internalQueue.add(task);
public Future<Void> submit(@NotNull T task) {
return underlyingProcessor.submit(task);
}

@Override
public void close() {
running = false;
thread.interrupt();
}

@Override
public void run() {
LOGGER.trace(String.format("AsyncWorker{%s} is starting", name));

while (running) {
try {
T polled = internalQueue.take();
consumer.accept(polled);
} catch (InterruptedException ignored) {
}
}

LOGGER.trace(String.format("AsyncWorker{%s} is shutting down", name));
underlyingProcessor.close();
}
}
5 changes: 5 additions & 0 deletions common/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
log4j.rootLogger=TRACE, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package xyz.gianlu.librespot.common;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncProcessorTest {

@Test
void testAsyncProcessor() throws ExecutionException, InterruptedException {
AtomicInteger internalState = new AtomicInteger();
AsyncProcessor<Integer, Integer> asyncProcessor = new AsyncProcessor<>("test-processor-1", internalState::addAndGet);
asyncProcessor.submit(1);
asyncProcessor.submit(2);
asyncProcessor.submit(3);
Future<Integer> lastTask = asyncProcessor.submit(4);

Integer lastResult = lastTask.get(); // we only need to wait for the last one as tasks are executed in order
Assertions.assertEquals(10, internalState.get());
Assertions.assertEquals(10, lastResult);
asyncProcessor.close();
}

@Test
void testAsyncProcessorExceptionHandling() {
AsyncProcessor<Integer, Integer> asyncProcessor = new AsyncProcessor<>("test-processor-2", i -> {
throw new IllegalStateException();
});

Future<Integer> firstTask = asyncProcessor.submit(1);
Assertions.assertThrows(ExecutionException.class, firstTask::get);

// now we check our loop didn't break and we are able to submit more tasks to our queue
Future<Integer> secondTask = asyncProcessor.submit(1);
Assertions.assertThrows(ExecutionException.class, secondTask::get);
asyncProcessor.close();
}

@Test
void testAsyncProcessorFailAfterShutdown() throws ExecutionException, InterruptedException {
AtomicInteger internalState = new AtomicInteger();
AsyncProcessor<Integer, Integer> asyncProcessor = new AsyncProcessor<>("test-processor-3", internalState::addAndGet);

Future<Integer> taskBeforeShutdown = asyncProcessor.submit(1);
Assertions.assertEquals(1, taskBeforeShutdown.get());

asyncProcessor.close();

IllegalStateException underlyingException = Assertions.assertThrows(IllegalStateException.class, () -> asyncProcessor.submit(1));
Assertions.assertEquals("AsyncProcessor test-processor-3 has already been shutdown", underlyingException.getMessage());
}

}
2 changes: 1 addition & 1 deletion core/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ log4j.rootLogger=TRACE, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c{1}:%L - %m%n