From b2c8f82e3067efe89a591589509dbb821dde97df Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 17 Apr 2019 11:37:51 -0400 Subject: [PATCH 1/4] Refactoring SequentialExecutorService Step 1: - create a `CallbackExecutor` and `AutoExecutor` as subclasses of SequentialExecutor. --- .../pubsub/v1/SequentialExecutorService.java | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index c9557fb65c77..1524810c8212 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -31,7 +31,7 @@ import java.util.logging.Logger; interface CancellableRunnable extends Runnable { - public void cancel(Throwable e); + void cancel(Throwable e); } /** @@ -42,13 +42,12 @@ interface CancellableRunnable extends Runnable { final class SequentialExecutorService { private static final Logger logger = Logger.getLogger(SequentialExecutorService.class.getName()); - private final SequentialExecutor manageableSequentialExecutor; - private final SequentialExecutor autoSequentialExecutor; + private final SequentialExecutor callbackExecutor; + private final SequentialExecutor autoExecutor; SequentialExecutorService(Executor executor) { - this.manageableSequentialExecutor = - SequentialExecutor.newManageableSequentialExecutor(executor); - this.autoSequentialExecutor = SequentialExecutor.newAutoSequentialExecutor(executor); + this.callbackExecutor = new CallbackExecutor(executor); + this.autoExecutor = new AutoExecutor(executor); } /** @@ -57,7 +56,7 @@ final class SequentialExecutorService { */ ApiFuture submit(final String key, final Callable callable) { final SettableApiFuture future = SettableApiFuture.create(); - manageableSequentialExecutor.execute( + callbackExecutor.execute( key, new CancellableRunnable() { private boolean cancelled = false; @@ -75,13 +74,13 @@ public void run() { @Override public void onSuccess(T msg) { future.set(msg); - manageableSequentialExecutor.resume(key); + callbackExecutor.resume(key); } @Override public void onFailure(Throwable e) { future.setException(e); - manageableSequentialExecutor.cancelQueuedTasks( + callbackExecutor.cancelQueuedTasks( key, new CancellationException( "Execution cancelled because executing previous runnable failed.")); @@ -103,7 +102,7 @@ public void cancel(Throwable e) { /** Runs synchronous {@code Runnable} tasks sequentially. */ void submit(final String key, final Runnable runnable) { - autoSequentialExecutor.execute(key, runnable); + autoExecutor.execute(key, runnable); } /** @@ -112,7 +111,7 @@ void submit(final String key, final Runnable runnable) { * key will be run only when its predecessor has been completed while tasks with different keys * can be run in parallel. */ - static class SequentialExecutor { + static abstract class SequentialExecutor { // Maps keys to tasks. private final Map> tasksByKey; private final Executor executor; @@ -124,23 +123,6 @@ enum TaskCompleteAction { private TaskCompleteAction taskCompleteAction; - /** - * Creates a AutoSequentialExecutor which executes the next queued task automatically when the - * previous task has completed. - */ - static SequentialExecutor newAutoSequentialExecutor(Executor executor) { - return new SequentialExecutor(executor, TaskCompleteAction.EXECUTE_NEXT_TASK); - } - - /** - * Creates a ManageableSequentialExecutor which allows users to decide when to execute the next - * queued task. The first queued task is executed immediately, but the following tasks will be - * executed only when {@link #resume(String)} is called explicitly. - */ - static SequentialExecutor newManageableSequentialExecutor(Executor executor) { - return new SequentialExecutor(executor, TaskCompleteAction.WAIT_UNTIL_RESUME); - } - private SequentialExecutor(Executor executor, TaskCompleteAction taskCompleteAction) { this.executor = executor; this.taskCompleteAction = taskCompleteAction; @@ -260,4 +242,16 @@ public void run() { }); } } + + private static class AutoExecutor extends SequentialExecutor { + AutoExecutor(Executor executor) { + super(executor, TaskCompleteAction.EXECUTE_NEXT_TASK); + } + } + + private static class CallbackExecutor extends SequentialExecutor { + CallbackExecutor(Executor executor) { + super(executor, TaskCompleteAction.WAIT_UNTIL_RESUME); + } + } } From fdefaa2799270af02d643e17e4548a1f3dc25a7d Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 17 Apr 2019 11:45:40 -0400 Subject: [PATCH 2/4] Adding CallbackExecutor.submit() for encapsulation --- .../pubsub/v1/SequentialExecutorService.java | 98 ++++++++++--------- 1 file changed, 51 insertions(+), 47 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 1524810c8212..f6ca9cd86ed2 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -30,6 +30,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + interface CancellableRunnable extends Runnable { void cancel(Throwable e); } @@ -42,8 +44,8 @@ interface CancellableRunnable extends Runnable { final class SequentialExecutorService { private static final Logger logger = Logger.getLogger(SequentialExecutorService.class.getName()); - private final SequentialExecutor callbackExecutor; - private final SequentialExecutor autoExecutor; + private final CallbackExecutor callbackExecutor; + private final AutoExecutor autoExecutor; SequentialExecutorService(Executor executor) { this.callbackExecutor = new CallbackExecutor(executor); @@ -55,58 +57,16 @@ final class SequentialExecutorService { * with the same key that have not been executed will be cancelled. */ ApiFuture submit(final String key, final Callable callable) { - final SettableApiFuture future = SettableApiFuture.create(); - callbackExecutor.execute( - key, - new CancellableRunnable() { - private boolean cancelled = false; - - @Override - public void run() { - if (cancelled) { - return; - } - try { - ApiFuture callResult = callable.call(); - ApiFutures.addCallback( - callResult, - new ApiFutureCallback() { - @Override - public void onSuccess(T msg) { - future.set(msg); - callbackExecutor.resume(key); - } - - @Override - public void onFailure(Throwable e) { - future.setException(e); - callbackExecutor.cancelQueuedTasks( - key, - new CancellationException( - "Execution cancelled because executing previous runnable failed.")); - } - }); - } catch (Exception e) { - future.setException(e); - } - } - - @Override - public void cancel(Throwable e) { - this.cancelled = true; - future.setException(e); - } - }); - return future; + return callbackExecutor.submit(key, callable); } /** Runs synchronous {@code Runnable} tasks sequentially. */ - void submit(final String key, final Runnable runnable) { + void submit( String key, Runnable runnable) { autoExecutor.execute(key, runnable); } /** - * Internal implemenation of SequentialExecutorService. Takes a serial stream of string keys and + * Internal implementation of SequentialExecutorService. Takes a serial stream of string keys and * {@code Runnable} tasks, and runs the tasks with the same key sequentially. Tasks with the same * key will be run only when its predecessor has been completed while tasks with different keys * can be run in parallel. @@ -253,5 +213,49 @@ private static class CallbackExecutor extends SequentialExecutor { CallbackExecutor(Executor executor) { super(executor, TaskCompleteAction.WAIT_UNTIL_RESUME); } + + ApiFuture submit(final String key, final Callable callable) { + final SettableApiFuture future = SettableApiFuture.create(); + execute( + key, + new CancellableRunnable() { + private boolean cancelled = false; + + @Override + public void run() { + if (cancelled) { + return; + } + try { + ApiFuture callResult = callable.call(); + ApiFutures.addCallback(callResult, new ApiFutureCallback() { + @Override + public void onSuccess(T msg) { + future.set(msg); + resume(key); + } + + @Override + public void onFailure(Throwable e) { + future.setException(e); + cancelQueuedTasks( + key, + new CancellationException( + "Execution cancelled because executing previous runnable failed.")); + } + }, directExecutor()); + } catch (Exception e) { + future.setException(e); + } + } + + @Override + public void cancel(Throwable e) { + this.cancelled = true; + future.setException(e); + } + }); + return future; + } } } From 50b8dc023a474b8c0c02f297b06de259bae2af3d Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 17 Apr 2019 11:57:31 -0400 Subject: [PATCH 3/4] Moving resume into `CallbackExecutor` --- .../pubsub/v1/SequentialExecutorService.java | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index f6ca9cd86ed2..d5a627d2d873 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -73,8 +73,8 @@ void submit( String key, Runnable runnable) { */ static abstract class SequentialExecutor { // Maps keys to tasks. - private final Map> tasksByKey; - private final Executor executor; + protected final Map> tasksByKey; + protected final Executor executor; enum TaskCompleteAction { EXECUTE_NEXT_TASK, @@ -145,35 +145,7 @@ void cancelQueuedTasks(final String key, Throwable e) { } } - /** Executes the next queued task associated with {@code key}. */ - void resume(final String key) { - if (taskCompleteAction.equals(TaskCompleteAction.EXECUTE_NEXT_TASK)) { - // resume() is no-op since tasks are executed automatically. - return; - } - Deque tasks; - synchronized (tasksByKey) { - tasks = tasksByKey.get(key); - if (tasks == null) { - return; - } - if (tasks.isEmpty()) { - tasksByKey.remove(key); - return; - } - } - final Deque finalTasks = tasks; - // Run the next task. - executor.execute( - new Runnable() { - @Override - public void run() { - invokeCallback(finalTasks); - } - }); - } - - private void invokeCallback(final Deque tasks) { + protected void invokeCallback(final Deque tasks) { // TODO(kimkyung-goog): Check if there is a race when task list becomes empty. Runnable task = tasks.poll(); if (task != null) { @@ -257,5 +229,29 @@ public void cancel(Throwable e) { }); return future; } + + /** Executes the next queued task associated with {@code key}. */ + void resume(final String key) { + Deque tasks; + synchronized (tasksByKey) { + tasks = tasksByKey.get(key); + if (tasks == null) { + return; + } + if (tasks.isEmpty()) { + tasksByKey.remove(key); + return; + } + } + final Deque finalTasks = tasks; + // Run the next task. + executor.execute( + new Runnable() { + @Override + public void run() { + invokeCallback(finalTasks); + } + }); + } } } From 4b212e0bae1e655ca28dc3aeaa3d79d698686878 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 17 Apr 2019 12:40:21 -0400 Subject: [PATCH 4/4] create an abstract class for execute(key, deque) This removest the last bit of code that directly used the TaskCompleteAction enum. --- .../pubsub/v1/SequentialExecutorService.java | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index d5a627d2d873..d92b58ba5989 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -76,16 +76,8 @@ static abstract class SequentialExecutor { protected final Map> tasksByKey; protected final Executor executor; - enum TaskCompleteAction { - EXECUTE_NEXT_TASK, - WAIT_UNTIL_RESUME, - } - - private TaskCompleteAction taskCompleteAction; - - private SequentialExecutor(Executor executor, TaskCompleteAction taskCompleteAction) { + private SequentialExecutor(Executor executor) { this.executor = executor; - this.taskCompleteAction = taskCompleteAction; this.tasksByKey = new HashMap<>(); } @@ -104,25 +96,11 @@ void execute(final String key, Runnable task) { tasksByKey.put(key, newTasks); } - final Deque finalTasks = newTasks; - executor.execute( - new Runnable() { - @Override - public void run() { - switch (taskCompleteAction) { - case EXECUTE_NEXT_TASK: - invokeCallbackAndExecuteNext(key, finalTasks); - break; - case WAIT_UNTIL_RESUME: - invokeCallback(finalTasks); - break; - default: - // Nothing to do. - } - } - }); + execute(key, newTasks); } + protected abstract void execute(String key, Deque finalTasks); + /** Cancels every task in the queue assoicated with {@code key}. */ void cancelQueuedTasks(final String key, Throwable e) { // TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked, @@ -153,7 +131,7 @@ protected void invokeCallback(final Deque tasks) { } } - private void invokeCallbackAndExecuteNext(final String key, final Deque tasks) { + protected void invokeCallbackAndExecuteNext(final String key, final Deque tasks) { invokeCallback(tasks); synchronized (tasksByKey) { if (tasks.isEmpty()) { @@ -177,13 +155,21 @@ public void run() { private static class AutoExecutor extends SequentialExecutor { AutoExecutor(Executor executor) { - super(executor, TaskCompleteAction.EXECUTE_NEXT_TASK); + super(executor); + } + + protected void execute(final String key, final Deque finalTasks) { + executor.execute(new Runnable() { + @Override public void run() { + invokeCallbackAndExecuteNext(key, finalTasks); + } + }); } } private static class CallbackExecutor extends SequentialExecutor { CallbackExecutor(Executor executor) { - super(executor, TaskCompleteAction.WAIT_UNTIL_RESUME); + super(executor); } ApiFuture submit(final String key, final Callable callable) { @@ -230,6 +216,14 @@ public void cancel(Throwable e) { return future; } + protected void execute(final String key, final Deque finalTasks) { + executor.execute(new Runnable() { + @Override public void run() { + invokeCallback(finalTasks); + } + }); + } + /** Executes the next queued task associated with {@code key}. */ void resume(final String key) { Deque tasks;