From e01c509c7c09aaf774a9649910578290dcc9ccbb Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Thu, 18 Apr 2019 13:29:54 -0400 Subject: [PATCH] More refactoring to SequentialExecutor - Moving common async code into a new method: `callNextTaskAsync()` - Adding a `postTaskExecution()` method which is useful for `AutoExecutor` --- .../pubsub/v1/SequentialExecutorService.java | 82 +++++++++---------- 1 file changed, 37 insertions(+), 45 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 892d9f6420c7..d1255547068d 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -21,6 +21,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; import com.google.api.core.SettableApiFuture; import java.util.Deque; import java.util.HashMap; @@ -44,14 +45,12 @@ interface CancellableRunnable extends Runnable { final class SequentialExecutorService { // This class is not directly usable. - private SequentialExecutorService() { - } + private SequentialExecutorService() {} /** - * This Executor takes a serial stream of string keys and - * {@code Runnable} tasks, and runs the tasks with the same key sequentially. Tasks with the same - * key will be run only when its predecessor has been completed while tasks with different keys - * can be run in parallel. + * This Executor takes a serial stream of string keys and {@code Runnable} tasks, and runs the + * tasks with the same key sequentially. Tasks with the same key will be run only when its + * predecessor has been completed while tasks with different keys can be run in parallel. */ private abstract static class SequentialExecutor { // Maps keys to tasks. @@ -78,43 +77,44 @@ protected void execute(final String key, Runnable task) { tasksByKey.put(key, newTasks); } - execute(key, newTasks); + callNextTaskAsync(key, newTasks); } - protected abstract void execute(String key, Deque finalTasks); + protected void callNextTaskAsync(final String key, final Deque tasks) { + executor.execute( + new Runnable() { + @Override + public void run() { + // TODO(kimkyung-goog): Check if there is a race when task list becomes empty. + Runnable task = tasks.poll(); + if (task != null) { + task.run(); + postTaskExecution(key, tasks); + } + } + }); + } - protected void invokeCallback(final Deque tasks) { - // TODO(kimkyung-goog): Check if there is a race when task list becomes empty. - Runnable task = tasks.poll(); - if (task != null) { - task.run(); - } + protected void postTaskExecution(String key, Deque tasks) { + // Do nothing in this class, but provide an opportunity for a subclass to do something + // interesting. } } + @BetaApi static class AutoExecutor extends SequentialExecutor { AutoExecutor(Executor executor) { super(executor); } /** Runs synchronous {@code Runnable} tasks sequentially. */ - void submit(String key, Runnable task) { + void submit(String key, Runnable task) { super.execute(key, task); } @Override - protected void execute(final String key, final Deque tasks) { - executor.execute( - new Runnable() { - @Override - public void run() { - invokeCallbackAndExecuteNext(key, tasks); - } - }); - } - - private void invokeCallbackAndExecuteNext(final String key, final Deque tasks) { - invokeCallback(tasks); + /** Once a task is done, automatically run the next task in the queue. */ + protected void postTaskExecution(final String key, final Deque tasks) { synchronized (tasksByKey) { if (tasks.isEmpty()) { // Note that there can be a race if a task is added to `tasks` at this point. However, @@ -125,16 +125,19 @@ private void invokeCallbackAndExecuteNext(final String key, final Deque - * This method does the following in a chain: + * + *

This method does the following in a chain: * *

    *
  1. Creates an `ApiFuture` that can be used for tracking progress. @@ -227,17 +230,6 @@ public void cancel(Throwable e) { return future; } - @Override - protected void execute(final String key, final Deque tasks) { - executor.execute( - new Runnable() { - @Override - public void run() { - invokeCallback(tasks); - } - }); - } - /** Executes the next queued task associated with {@code key}. */ private void resume(String key) { Deque tasks; @@ -251,7 +243,7 @@ private void resume(String key) { return; } } - execute(key, tasks); + callNextTaskAsync(key, tasks); } /** Cancels every task in the queue assoicated with {@code key}. */