Skip to content

Commit

Permalink
Exposing AutoExecutor and CallbackExecutor directly (#4983)
Browse files Browse the repository at this point in the history
  • Loading branch information
sduskis authored Apr 18, 2019
1 parent 4a26abe commit 18aca86
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class MessageDispatcher {
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);

private final Executor executor;
private final SequentialExecutorService sequentialExecutor;
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
private final ScheduledExecutorService systemExecutor;
private final ApiClock clock;

Expand Down Expand Up @@ -206,7 +206,7 @@ void sendAckOperations(
jobLock = new ReentrantLock();
messagesWaiter = new MessageWaiter();
this.clock = clock;
this.sequentialExecutor = new SequentialExecutorService(executor);
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
}

void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class Publisher {
private final PublisherStub publisherStub;

private final ScheduledExecutorService executor;
private final SequentialExecutorService sequentialExecutor;
private final SequentialExecutorService.CallbackExecutor sequentialExecutor;
private final AtomicBoolean shutdown;
private final List<AutoCloseable> closeables;
private final MessageWaiter messagesWaiter;
Expand Down Expand Up @@ -127,7 +127,7 @@ private Publisher(Builder builder) throws IOException {
messagesBatchLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
executor = builder.executorProvider.getExecutor();
sequentialExecutor = new SequentialExecutorService(executor);
sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor);
if (builder.executorProvider.shouldAutoClose()) {
closeables =
Collections.<AutoCloseable>singletonList(new ExecutorAsBackgroundResource(executor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,18 @@ interface CancellableRunnable extends Runnable {
* be run in parallel.
*/
final class SequentialExecutorService {
private static final Logger logger = Logger.getLogger(SequentialExecutorService.class.getName());

private final CallbackExecutor callbackExecutor;
private final AutoExecutor autoExecutor;

SequentialExecutorService(Executor executor) {
this.callbackExecutor = new CallbackExecutor(executor);
this.autoExecutor = new AutoExecutor(executor);
}

/**
* Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks
* with the same key that have not been executed will be cancelled.
*/
<T> ApiFuture<T> submit(final String key, final Callable<ApiFuture<T>> callable) {
return callbackExecutor.submit(key, callable);
}

/** Runs synchronous {@code Runnable} tasks sequentially. */
void submit(String key, Runnable runnable) {
autoExecutor.execute(key, runnable);
// This class is not directly usable.
private SequentialExecutorService() {
}

/**
* Internal implementation of SequentialExecutorService. Takes a serial stream of string keys and
* This Executor takes a serial stream of string keys and
* {@code Runnable} tasks, and runs the tasks with the same key sequentially. Tasks with the same
* key will be run only when its predecessor has been completed while tasks with different keys
* can be run in parallel.
*/
abstract static class SequentialExecutor {
private abstract static class SequentialExecutor {
// Maps keys to tasks.
protected final Map<String, Deque<Runnable>> tasksByKey;
protected final Executor executor;
Expand All @@ -81,7 +63,7 @@ private SequentialExecutor(Executor executor) {
this.tasksByKey = new HashMap<>();
}

void execute(final String key, Runnable task) {
protected void execute(final String key, Runnable task) {
Deque<Runnable> newTasks;
synchronized (tasksByKey) {
newTasks = tasksByKey.get(key);
Expand Down Expand Up @@ -110,11 +92,16 @@ protected void invokeCallback(final Deque<Runnable> tasks) {
}
}

private static class AutoExecutor extends SequentialExecutor {
static class AutoExecutor extends SequentialExecutor {
AutoExecutor(Executor executor) {
super(executor);
}

/** Runs synchronous {@code Runnable} tasks sequentially. */
void submit(String key, Runnable task) {
super.execute(key, task);
}

@Override
protected void execute(final String key, final Deque<Runnable> tasks) {
executor.execute(
Expand Down Expand Up @@ -142,12 +129,21 @@ private void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable
}
}

private static class CallbackExecutor extends SequentialExecutor {
/**
* Runs asynchronous {@code Callable} tasks sequentially for the same key. If one of the tasks fails, other tasks
* with the same key that have not been executed will be cancelled.
*/
static class CallbackExecutor extends SequentialExecutor {
private static final Logger logger = Logger.getLogger(SequentialExecutorService.SequentialExecutor.class.getName());

CallbackExecutor(Executor executor) {
super(executor);
}

/**
* Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks
* with the same key that have not been executed will be cancelled.
* <p>
* This method does the following in a chain:
*
* <ol>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void finish() {

@Test
public void testExecutorRunsNextTaskWhenPrevResponseReceived() throws Exception {
SequentialExecutorService sequentialExecutorService =
new SequentialExecutorService(executorProvider.getExecutor());
SequentialExecutorService.CallbackExecutor sequentialExecutorService =
new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor());
AsyncTaskCallable callable1 = new AsyncTaskCallable();
AsyncTaskCallable callable2 = new AsyncTaskCallable();
AsyncTaskCallable callable3 = new AsyncTaskCallable();
Expand All @@ -97,8 +97,8 @@ public void testExecutorRunsNextTaskWhenPrevResponseReceived() throws Exception

@Test
public void testExecutorRunsDifferentKeySimultaneously() throws Exception {
SequentialExecutorService sequentialExecutorService =
new SequentialExecutorService(executorProvider.getExecutor());
SequentialExecutorService.CallbackExecutor sequentialExecutorService =
new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor());
AsyncTaskCallable callable1 = new AsyncTaskCallable();
AsyncTaskCallable callable2 = new AsyncTaskCallable();
AsyncTaskCallable callable3 = new AsyncTaskCallable();
Expand Down Expand Up @@ -126,8 +126,8 @@ public void testExecutorRunsDifferentKeySimultaneously() throws Exception {

@Test
public void testExecutorCancelsAllTasksWhenOneFailed() throws Exception {
SequentialExecutorService sequentialExecutorService =
new SequentialExecutorService(executorProvider.getExecutor());
SequentialExecutorService.CallbackExecutor sequentialExecutorService =
new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor());
AsyncTaskCallable callable1 = new AsyncTaskCallable();
AsyncTaskCallable callable2 = new AsyncTaskCallable();
AsyncTaskCallable callable3 = new AsyncTaskCallable();
Expand Down Expand Up @@ -207,8 +207,8 @@ public void run() {
public void SequentialExecutorRunsTasksAutomatically() throws Exception {
int numKeys = 100;
int numTasks = 100;
SequentialExecutorService sequentialExecutor =
new SequentialExecutorService(executorProvider.getExecutor());
SequentialExecutorService.AutoExecutor sequentialExecutor =
new SequentialExecutorService.AutoExecutor(executorProvider.getExecutor());
CountDownLatch remainingTasksCount = new CountDownLatch(numKeys * numTasks);
// Maps keys to lists of started and completed tasks.
Map<String, LinkedHashSet<Integer>> startedTasks = new HashMap<>();
Expand Down

0 comments on commit 18aca86

Please sign in to comment.